diff --git a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp index 74466ee488..ed9fc3a30d 100644 --- a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp +++ b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp @@ -52,8 +52,9 @@ struct PipelineMetrics { class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { protected: - class ImplInterface; + class IContinuousBatchingPipeline; class ContinuousBatchingImpl; + class ContinuousBatchingForSpeculativeDecodingImpl; class ContinuousBatchingForPromptLookupImpl; class SpeculativeDecodingImpl; @@ -64,7 +65,7 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { friend class SpeculativeDecodingImpl; friend class PromptLookupImpl; - std::shared_ptr m_impl; + std::shared_ptr m_impl; ContinuousBatchingPipeline() = default; diff --git a/src/cpp/include/openvino/genai/lora_adapter.hpp b/src/cpp/include/openvino/genai/lora_adapter.hpp index 277ec57cc3..b6b91bee20 100644 --- a/src/cpp/include/openvino/genai/lora_adapter.hpp +++ b/src/cpp/include/openvino/genai/lora_adapter.hpp @@ -188,7 +188,7 @@ class OPENVINO_GENAI_EXPORTS AdapterController { AdapterController(std::shared_ptr model, const AdapterConfig& config, std::string device); // Apply adapters configured in the current config set last time, or set and use new config given as optional `config` argument - void apply(ov::InferRequest& request, const std::optional& config = std::nullopt); + void apply(ov::InferRequest request, const std::optional& config = std::nullopt); // Returns true if a given name is one of the state names created by this adapter controller for dynamic LoRA // Helps to distinguish LoRA states from other states (e.g. KV cache state) in the model for a partial state reset. diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index 52ec6a8302..9e20171dcb 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -5,6 +5,7 @@ #include "continuous_batching_impl.hpp" #include "utils.hpp" #include "utils/paged_attention_transformations.hpp" +#include "lora_helper.hpp" namespace ov::genai { template struct overloaded : Ts... {using Ts::operator()...;}; @@ -17,8 +18,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::ContinuousBatchingImpl( const std::string& device, const ov::AnyMap& properties, const ov::genai::GenerationConfig& generation_config, - bool is_validation_mode_enabled - ) { + bool is_validation_mode_enabled) { m_tokenizer = tokenizer; m_generation_config = generation_config; m_is_validation_mode_enabled = is_validation_mode_enabled; @@ -33,22 +33,33 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::ContinuousBatchingImpl( bool is_need_per_layer_cache_control = scheduler_config.use_cache_eviction; utils::apply_paged_attention_transformations(model, device_config, is_need_per_layer_cache_control); - init(model, scheduler_config, compile_properties, device_config, core); + initialize_pipeline(model, scheduler_config, compile_properties, device_config, core); } void ContinuousBatchingPipeline::ContinuousBatchingImpl::_pull_awaiting_requests() { std::lock_guard lock{m_awaiting_requests_mutex}; m_requests.insert(m_requests.end(), m_awaiting_requests.begin(), m_awaiting_requests.end()); m_awaiting_requests.clear(); + m_pipeline_metrics.requests = m_requests.size(); } -void ContinuousBatchingPipeline::ContinuousBatchingImpl::init( +void ContinuousBatchingPipeline::ContinuousBatchingImpl::initialize_pipeline( std::shared_ptr model, const SchedulerConfig& scheduler_config, const ov::AnyMap& properties, const DeviceConfig& device_config, ov::Core& core) { - auto compiled_model = core.compile_model(model, device_config.get_device(), properties); + ov::CompiledModel compiled_model; + + // apply LoRA + if (auto filtered_properties = extract_adapters_from_properties(properties, &m_generation_config.adapters)) { + m_generation_config.adapters->set_tensor_name_prefix("base_model.model.model."); + m_adapter_controller = AdapterController(model, *m_generation_config.adapters, device_config.get_device()); // TODO: Make the prefix name configurable + compiled_model = core.compile_model(model, device_config.get_device(), *filtered_properties); + } else { + compiled_model = core.compile_model(model, device_config.get_device(), properties); + } + ov::genai::utils::print_compiled_model_properties(compiled_model, "LLM with Paged Attention"); ov::InferRequest infer_request = compiled_model.create_infer_request(); @@ -68,9 +79,12 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::init( can_use_partial_preemption = false; } m_scheduler = std::make_shared(device_config.get_block_size(), m_cache_manager, updated_config, device_config.get_num_layers(), can_use_partial_preemption); - // and finally create model runner + + // model runner bool is_use_cache_eviction = m_scheduler->get_config().use_cache_eviction; m_model_runner = std::make_shared(infer_request, m_scheduler->get_block_size(), device_config.get_num_layers(), is_use_cache_eviction); + + // sampler m_sampler = std::make_shared(m_tokenizer); m_sampler->set_seed(m_generation_config.rng_seed); @@ -94,6 +108,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::add_request(uint64_t request m_scheduler->get_block_size(), m_scheduler->get_config().enable_prefix_caching); sequence_group->set_sequence_group_ptr(sequence_group); + if (m_scheduler->get_config().enable_prefix_caching) { m_scheduler->restore_cached_blocks(sequence_group); } @@ -102,6 +117,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::add_request(uint64_t request std::lock_guard lock{m_awaiting_requests_mutex}; m_awaiting_requests.push_back(sequence_group); } + return std::make_shared(sequence_group->get_generation_stream(), sampling_params); }; @@ -113,6 +129,7 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::add_request(uint64_t request timer.start(); ov::Tensor input_ids = m_tokenizer.encode(prompt).input_ids; timer.end(); + return add_request(request_id, input_ids, sampling_params); } @@ -127,24 +144,26 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { _pull_awaiting_requests(); - m_pipeline_metrics.requests = m_requests.size(); Scheduler::Output scheduler_output; { - static ManualTimer timer("scheduling"); - timer.start(); - m_scheduler->clean_empty_blocks(m_requests); + static ManualTimer scheduling_timer("scheduling"); + scheduling_timer.start(); scheduler_output = m_scheduler->schedule(m_requests); + scheduling_timer.end(); + m_pipeline_metrics.scheduled_requests = scheduler_output.m_scheduled_sequence_groups_ids.size(); m_pipeline_metrics.cache_usage = scheduler_output.m_cache_usage; - m_pipeline_metrics.max_cache_usage = - std::max(m_pipeline_metrics.max_cache_usage, scheduler_output.m_cache_usage); + m_pipeline_metrics.max_cache_usage = std::max(m_pipeline_metrics.max_cache_usage, scheduler_output.m_cache_usage); _register_step_cache_usage(scheduler_output.m_cache_usage); m_pipeline_metrics.avg_cache_usage = _get_current_running_average_cache_usage(); + + static ManualTimer copy_blocks_timer("scheduling"); + copy_blocks_timer.start(); m_cache_manager->copy_blocks(scheduler_output.m_block_copy_map); - timer.end(); + copy_blocks_timer.end(); } - // if no tokens were scheduled, we are out of memory + // if no tokens were scheduled, we are out of memory => free all requests and return if (scheduler_output.m_total_num_scheduled_tokens == 0) { for (size_t i = 0; i < m_requests.size(); ++i) { SequenceGroup::Ptr sequence_group = m_requests[i]; @@ -166,15 +185,14 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { } #ifdef DEBUG_CACHE_STATE_DUMP - CacheStateDumper dumper(CacheStateDumper::get_run_id_for_generation_step(step_count, "before_eviction")); dumper.dump_cache_state(*m_scheduler, m_requests, step_count); #endif - const auto& sched_config = m_scheduler->get_config(); // evict unimportant blocks from KV cache, if requested + const auto& sched_config = m_scheduler->get_config(); if (sched_config.use_cache_eviction) { - maybe_evict_cache_blocks(sched_config); + _maybe_evict_cache_blocks(sched_config); } #ifdef DEBUG_CACHE_STATE_DUMP @@ -183,6 +201,7 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { step_count++; #endif + // process generation_config.echo parameetr _fill_prompt_log_probs(m_requests, logits); SamplerOutput sampler_output; @@ -195,8 +214,8 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { // process sampler_output (e.g. fork or drop sequences from BlockScheduler) { - static ManualTimer timer("fork / free sequence"); - timer.start(); + static ManualTimer free_fork_timer("fork / free sequence"); + free_fork_timer.start(); for (const auto& pair : sampler_output.m_forked_sequences) { uint64_t parent_id = pair.first; @@ -208,35 +227,49 @@ void ContinuousBatchingPipeline::ContinuousBatchingImpl::step() { for (auto seq_id : sampler_output.m_dropped_sequences) m_scheduler->free_sequence(seq_id); - timer.end(); + free_fork_timer.end(); } // notify requests dropped by handle { - static ManualTimer timer("notify requests dropped by handle"); - timer.start(); + static ManualTimer report_tokens_timer("notify requests dropped by handle"); + report_tokens_timer.start(); _notify_requests_dropped_by_handle(); - timer.end(); + report_tokens_timer.end(); } // free non running requests for current step { - static ManualTimer timer("free non running requests"); - timer.start(); + static ManualTimer clean_up_requests_timer("free non running requests"); + clean_up_requests_timer.start(); _free_non_running_requests(); - timer.end(); + clean_up_requests_timer.end(); } step_timer.end(); } +void ContinuousBatchingPipeline::ContinuousBatchingImpl::set_adapters(const std::optional& adapters) { + if (m_adapter_controller) { + m_adapter_controller->apply(m_model_runner->get_infer_request(), adapters); + } +} + std::vector ContinuousBatchingPipeline::ContinuousBatchingImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); OPENVINO_ASSERT(input_ids.size() == sampling_params.size()); + + // checks that all requests has the same LoRA adapters property value + for (size_t i = 1; i < sampling_params.size(); ++i) { + OPENVINO_ASSERT(sampling_params[i - 1].adapters == sampling_params[i].adapters, + "LoRA adapters value must be the same for all requests"); + } + set_adapters(sampling_params[0].adapters); + const std::shared_ptr& streamer_ptr = std::visit(overloaded{ [](std::monostate) -> std::shared_ptr { return nullptr; @@ -375,7 +408,7 @@ float ContinuousBatchingPipeline::ContinuousBatchingImpl::_get_current_running_a return std::accumulate(m_previous_step_cache_usages.begin(), m_previous_step_cache_usages.end(), 0.0) / m_previous_step_cache_usages.size(); } -void ContinuousBatchingPipeline::ContinuousBatchingImpl::maybe_evict_cache_blocks(const SchedulerConfig& sched_config) { +void ContinuousBatchingPipeline::ContinuousBatchingImpl::_maybe_evict_cache_blocks(const SchedulerConfig& sched_config) { std::unordered_map seq_group_to_num_blocks_evicted_map; auto sequence_attention_scores = m_model_runner->get_last_attention_scores(); for (auto& seq_id_and_attention_scores : sequence_attention_scores) { diff --git a/src/cpp/src/continuous_batching_impl.hpp b/src/cpp/src/continuous_batching_impl.hpp index 8da05c6dfa..d319147f2c 100644 --- a/src/cpp/src/continuous_batching_impl.hpp +++ b/src/cpp/src/continuous_batching_impl.hpp @@ -3,16 +3,19 @@ #pragma once -#include "continuous_batching_impl_interface.hpp" -#include "openvino/genai/continuous_batching_pipeline.hpp" +#include "icontinuous_batching.hpp" + +#include "openvino/genai/lora_adapter.hpp" #include "cache_eviction.hpp" namespace ov::genai { -class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatchingPipeline::ImplInterface { + +class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatchingPipeline::IContinuousBatchingPipeline { protected: std::shared_ptr m_scheduler; std::shared_ptr m_cache_manager; std::shared_ptr m_model_runner; + std::optional m_adapter_controller; std::shared_ptr m_sampler; // current requests to process @@ -26,7 +29,7 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc static const size_t AVG_CACHE_USAGE_WINDOW_SIZE_IN_STEPS = 1000; std::deque m_previous_step_cache_usages; - + // flag to enable validation mode for sampler bool m_is_validation_mode_enabled = false; @@ -37,21 +40,41 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc // used by tests only ContinuousBatchingImpl() = default; + void initialize_pipeline(std::shared_ptr model, + const SchedulerConfig& scheduler_config, + const ov::AnyMap& plugin_config, + const DeviceConfig& device_config, + ov::Core& core); + + /** + * Pulls requests from awaiting queue to running queue + * Should be called within each call of step() + */ + virtual void _pull_awaiting_requests(); + + /** + * Releases non-running (finished, dropped or OOM) requests from running queue + */ void _free_non_running_requests(); + + /** + * Notify dropped requests by pushing empty output + */ void _notify_requests_dropped_by_handle(); - void _register_step_cache_usage(float step_cache_usage); - float _get_current_running_average_cache_usage() const; - void maybe_evict_cache_blocks(const SchedulerConfig& sched_config); - void init(std::shared_ptr model, - const SchedulerConfig& scheduler_config, - const ov::AnyMap& plugin_config, - const DeviceConfig& device_config, - ov::Core& core); + /** + * Handles 'echo' generation parameter + */ + void _fill_prompt_log_probs(std::vector& sequence_groups, ov::Tensor& logits); - virtual void _pull_awaiting_requests(); + /** + * Performs KV cache eviction is enabled / requireed + */ + void _maybe_evict_cache_blocks(const SchedulerConfig& sched_config); + + void _register_step_cache_usage(float step_cache_usage); + float _get_current_running_average_cache_usage() const; - void _fill_prompt_log_probs(std::vector& sequence_groups, ov::Tensor& logits); public: ContinuousBatchingImpl(const std::shared_ptr& model, const Tokenizer& tokenizer, @@ -64,6 +87,7 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc GenerationHandle add_request(uint64_t request_id, const ov::Tensor& input_ids, ov::genai::GenerationConfig sampling_params) override; + GenerationHandle add_request(uint64_t request_id, const std::string& prompt, ov::genai::GenerationConfig sampling_params) override; @@ -76,5 +100,11 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) override; + + /** + * Updates LoRA adapters for current generation call + */ + void set_adapters(const std::optional& adapters); }; -} \ No newline at end of file + +} // namespace ov::genai diff --git a/src/cpp/src/continuous_batching_pipeline.cpp b/src/cpp/src/continuous_batching_pipeline.cpp index 148eb2fa9f..8b7003e4ab 100644 --- a/src/cpp/src/continuous_batching_pipeline.cpp +++ b/src/cpp/src/continuous_batching_pipeline.cpp @@ -47,19 +47,20 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::filesystem::p auto properties_without_draft_model = properties; auto draft_model_desr = extract_draft_model_from_config(properties_without_draft_model); auto is_prompt_lookup_enabled = extract_prompt_lookup_from_config(properties_without_draft_model); - + std::filesystem::path openvino_model_name = "openvino_model.xml"; auto model = utils::singleton_core().read_model((models_path / openvino_model_name).string()); auto tokenizer = ov::genai::Tokenizer(models_path, tokenizer_properties); auto generation_config = utils::from_config_json_if_exists(models_path); + if (is_prompt_lookup_enabled) { - OPENVINO_ASSERT(draft_model_desr.model == nullptr, "Speculative decoding and prompt lookup decoding are mutually excluded"); + OPENVINO_ASSERT(draft_model_desr.model == nullptr, "Speculative decoding and prompt lookup decoding are mutually exclusive"); m_impl = std::make_shared(model, tokenizer, scheduler_config, device, properties_without_draft_model, generation_config); - } else if (draft_model_desr.model == nullptr) { - m_impl = std::make_shared(model, tokenizer, scheduler_config, device, properties, generation_config); - } else { + } else if (draft_model_desr.model != nullptr) { auto main_model_descr = ov::genai::ModelDesc(model, tokenizer, device, properties_without_draft_model, scheduler_config, generation_config); m_impl = std::make_shared(main_model_descr, draft_model_desr); + } else { + m_impl = std::make_shared(model, tokenizer, scheduler_config, device, properties, generation_config); } } @@ -77,13 +78,13 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( auto generation_config = utils::from_config_json_if_exists(models_path); if (is_prompt_lookup_enabled) { - OPENVINO_ASSERT(draft_model_desr.model == nullptr, "Speculative decoding and prompt lookup decoding are mutually excluded"); + OPENVINO_ASSERT(draft_model_desr.model == nullptr, "Speculative decoding and prompt lookup decoding are mutually exclusive"); m_impl = std::make_shared(model, tokenizer, scheduler_config, device, properties_without_draft_model, generation_config); - } else if (draft_model_desr.model == nullptr) { - m_impl = std::make_shared(model, tokenizer, scheduler_config, device, properties, generation_config); - } else { + } else if (draft_model_desr.model != nullptr) { auto main_model_descr = ov::genai::ModelDesc(model, tokenizer, device, properties_without_draft_model, scheduler_config, generation_config); m_impl = std::make_shared(main_model_descr, draft_model_desr); + } else { + m_impl = std::make_shared(model, tokenizer, scheduler_config, device, properties, generation_config); } } @@ -101,13 +102,13 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( auto model = utils::singleton_core().read_model(model_str, weights_tensor); if (is_prompt_lookup_enabled) { - OPENVINO_ASSERT(draft_model_desr.model == nullptr, "Speculative decoding and prompt lookup decoding are mutually excluded"); + OPENVINO_ASSERT(draft_model_desr.model == nullptr, "Speculative decoding and prompt lookup decoding are mutually exclusive"); m_impl = std::make_shared(model, tokenizer, scheduler_config, device, properties_without_draft_model, generation_config); - } else if (draft_model_desr.model == nullptr) { - m_impl = std::make_shared(model, tokenizer, scheduler_config, device, properties, generation_config); - } else { + } else if (draft_model_desr.model != nullptr) { auto main_model_descr = ov::genai::ModelDesc(model, tokenizer, device, properties_without_draft_model, scheduler_config, generation_config); - m_impl = std::make_shared(main_model_descr, draft_model_desr); + m_impl = std::make_shared(main_model_descr, draft_model_desr); + } else { + m_impl = std::make_shared(model, tokenizer, scheduler_config, device, properties, generation_config); } } diff --git a/src/cpp/src/continuous_batching_impl_interface.cpp b/src/cpp/src/icontinuous_batching.cpp similarity index 79% rename from src/cpp/src/continuous_batching_impl_interface.cpp rename to src/cpp/src/icontinuous_batching.cpp index 10fc102aa0..e32616b0aa 100644 --- a/src/cpp/src/continuous_batching_impl_interface.cpp +++ b/src/cpp/src/icontinuous_batching.cpp @@ -1,40 +1,41 @@ // Copyright (C) 2023-2024 Intel Corporation // SPDX-License-Identifier: Apache-2.0 -#include "continuous_batching_impl_interface.hpp" +#include "icontinuous_batching.hpp" namespace ov::genai { -GenerationConfig ContinuousBatchingPipeline::ImplInterface::get_config() const { +GenerationConfig ContinuousBatchingPipeline::IContinuousBatchingPipeline::get_config() const { return m_generation_config; } -PipelineMetrics ContinuousBatchingPipeline::ImplInterface::get_metrics() const { +PipelineMetrics ContinuousBatchingPipeline::IContinuousBatchingPipeline::get_metrics() const { return m_pipeline_metrics; } -Tokenizer ContinuousBatchingPipeline::ImplInterface::get_tokenizer() { +Tokenizer ContinuousBatchingPipeline::IContinuousBatchingPipeline::get_tokenizer() { return m_tokenizer; } -void ContinuousBatchingPipeline::ImplInterface::start_chat(const std::string& system_message) { +void ContinuousBatchingPipeline::IContinuousBatchingPipeline::start_chat(const std::string& system_message) { if (!system_message.empty()) { m_history.push_back({{"role", "system"}, {"content", system_message}}); } m_is_chat_conversation = true; }; -void ContinuousBatchingPipeline::ImplInterface::finish_chat() { +void ContinuousBatchingPipeline::IContinuousBatchingPipeline::finish_chat() { m_is_chat_conversation = false; m_history.clear(); }; std::vector -ContinuousBatchingPipeline::ImplInterface::generate( +ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( const std::vector& prompts, std::vector sampling_params, const StreamerVariant& streamer) { std::vector input_ids; + static ManualTimer timer("tokenize"); if (m_is_chat_conversation) { OPENVINO_ASSERT(1 == prompts.size(), "Can't chat with multiple prompts"); @@ -47,13 +48,15 @@ ContinuousBatchingPipeline::ImplInterface::generate( timer.end(); } else { input_ids.reserve(prompts.size()); + timer.start(); for (const std::string& prompt : prompts) { - timer.start(); input_ids.push_back(m_tokenizer.encode(prompt).input_ids); - timer.end(); } + timer.end(); } + std::vector encoded = generate(input_ids, sampling_params, streamer); + std::vector decoded; decoded.reserve(encoded.size()); for (EncodedGenerationResult& res : encoded) { @@ -65,6 +68,7 @@ ContinuousBatchingPipeline::ImplInterface::generate( m_history.push_back({{"role", "assistant"}, {"content", generated.back()}}); } } + decoded.push_back(GenerationResult{ res.m_request_id, std::move(generated), @@ -72,6 +76,7 @@ ContinuousBatchingPipeline::ImplInterface::generate( res.m_status }); } + return decoded; } -} \ No newline at end of file +} diff --git a/src/cpp/src/continuous_batching_impl_interface.hpp b/src/cpp/src/icontinuous_batching.hpp similarity index 72% rename from src/cpp/src/continuous_batching_impl_interface.hpp rename to src/cpp/src/icontinuous_batching.hpp index 909383c98a..12030f06f7 100644 --- a/src/cpp/src/continuous_batching_impl_interface.hpp +++ b/src/cpp/src/icontinuous_batching.hpp @@ -12,7 +12,10 @@ namespace ov::genai { -class ContinuousBatchingPipeline::ImplInterface { +/** + * Base interface for all continuous batching based pipelines + */ +class ContinuousBatchingPipeline::IContinuousBatchingPipeline { protected: Tokenizer m_tokenizer; @@ -35,6 +38,7 @@ class ContinuousBatchingPipeline::ImplInterface { // std::cout << std::endl; } } m_perf; + bool m_is_chat_conversation = false; ChatHistory m_history; @@ -43,27 +47,57 @@ class ContinuousBatchingPipeline::ImplInterface { PipelineMetrics get_metrics() const; ov::genai::Tokenizer get_tokenizer(); + /** + * Adds requests to awaiting queue using encoded inputs + */ virtual GenerationHandle add_request(uint64_t request_id, const ov::Tensor& input_ids, ov::genai::GenerationConfig sampling_params) = 0; + + /** + * Adds request to running queue based on string input + * This step also performs tokenization's encode + */ virtual GenerationHandle add_request(uint64_t request_id, const std::string& prompt, ov::genai::GenerationConfig sampling_params) = 0; + /** + * Checks whether server (pipeline) has non-finished requests and step() should be called within a loop + */ virtual bool has_non_finished_requests() = 0; + /** + * Performs a single inference step of all running (and pulls awaiting) requests + */ virtual void step() = 0; + /** + * Performs monolitic generation based on encoded prompts + */ virtual std::vector generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) = 0; + + /** + * Performs monolitic generation based on text prompts + */ std::vector generate(const std::vector& prompts, std::vector sampling_params, const StreamerVariant& streamer); + /** + * Starts chat with a given system prompt + * + * In chat scenario prompts passed to `generate` method are accumulated inside the pipeline until `finish_chat` is called + */ void start_chat(const std::string& system_message); + + /** + * Ends chat + */ void finish_chat(); }; } \ No newline at end of file diff --git a/src/cpp/src/llm_pipeline.cpp b/src/cpp/src/llm_pipeline.cpp index 3e378e78cf..74fe821a5e 100644 --- a/src/cpp/src/llm_pipeline.cpp +++ b/src/cpp/src/llm_pipeline.cpp @@ -15,7 +15,6 @@ #include "llm_pipeline_static.hpp" #include "utils.hpp" #include "text_callback_streamer.hpp" -#include "openvino/genai/lora_adapter.hpp" #include "lora_helper.hpp" #include "speculative_decoding/speculative_decoding_impl.hpp" #include "sampler.hpp" diff --git a/src/cpp/src/lora_adapter.cpp b/src/cpp/src/lora_adapter.cpp index fd446ef708..e060e55160 100644 --- a/src/cpp/src/lora_adapter.cpp +++ b/src/cpp/src/lora_adapter.cpp @@ -1305,7 +1305,7 @@ AdapterController::AdapterController(std::shared_ptr model, const Ada // Call it every time when adapter config is changed; if adapter was configured as a static one, this call is not required -void AdapterController::apply(ov::InferRequest& request, const std::optional& config) { +void AdapterController::apply(ov::InferRequest request, const std::optional& config) { OPENVINO_ASSERT(m_pimpl || !config || !*config, "Adapters are passed to AdapterController but it was not configured to use adapters. " "Enable using adapters by pass them in the constructor first."); diff --git a/src/cpp/src/model_runner.hpp b/src/cpp/src/model_runner.hpp index 1b96cdc505..abc96ac423 100644 --- a/src/cpp/src/model_runner.hpp +++ b/src/cpp/src/model_runner.hpp @@ -52,7 +52,7 @@ class ModelRunner { /** * @return The ov::InferRequest this ModelRunner is handling. */ - ov::InferRequest get_infer_request() const { + ov::InferRequest get_infer_request() { return m_request; } diff --git a/src/cpp/src/prompt_lookup/continuous_batching_for_prompt_lookup.cpp b/src/cpp/src/prompt_lookup/continuous_batching_for_prompt_lookup.cpp index 8c9e520728..ffc8a8aab2 100644 --- a/src/cpp/src/prompt_lookup/continuous_batching_for_prompt_lookup.cpp +++ b/src/cpp/src/prompt_lookup/continuous_batching_for_prompt_lookup.cpp @@ -82,4 +82,5 @@ void ContinuousBatchingPipeline::ContinuousBatchingForPromptLookupImpl::generate request->set_num_validated_tokens(max_validation_len); } } + } \ No newline at end of file diff --git a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp index f934a56939..7a893a2603 100644 --- a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp +++ b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp @@ -73,10 +73,19 @@ std::vector ContinuousBatchingPipeline::PromptLookupImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { - ManualTimer generate_timer("speculative_decoding: generate()"); - generate_timer.start(); OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); OPENVINO_ASSERT(input_ids.size() == sampling_params.size()); + + ManualTimer generate_timer("speculative_decoding: generate()"); + generate_timer.start(); + + // checks that all requests has the same LoRA adapters property value + for (size_t i = 1; i < sampling_params.size(); ++i) { + OPENVINO_ASSERT(sampling_params[i - 1].adapters == sampling_params[i].adapters, + "LoRA adapters value must be the same for all requests"); + } + m_pipeline->set_adapters(sampling_params[0].adapters); + const std::shared_ptr& streamer_ptr = std::visit(overloaded{ [](std::monostate) -> std::shared_ptr { return nullptr; diff --git a/src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp b/src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp index dae721741b..0c05c2afd0 100644 --- a/src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp +++ b/src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp @@ -11,11 +11,11 @@ namespace ov::genai { -class ContinuousBatchingPipeline::PromptLookupImpl : public ContinuousBatchingPipeline::ImplInterface { +class ContinuousBatchingPipeline::PromptLookupImpl : public ContinuousBatchingPipeline::IContinuousBatchingPipeline { protected: std::shared_ptr m_pipeline; SpeculativeDecodingMetrics m_sd_metrics; - + public: PromptLookupImpl(const std::shared_ptr& model, const Tokenizer& tokenizer, diff --git a/src/cpp/src/scheduler.hpp b/src/cpp/src/scheduler.hpp index da65c68bec..0057b19329 100644 --- a/src/cpp/src/scheduler.hpp +++ b/src/cpp/src/scheduler.hpp @@ -56,6 +56,10 @@ class Scheduler { Output schedule(std::vector& sequence_groups) { Output scheduler_output; + + // free some blocks taken by non-confirmed condidates in SD / prompt look-up + clean_empty_blocks(sequence_groups); + if (m_block_manager.get_total_number_of_kv_blocks() == 0) { _initialize_cache(sequence_groups); } @@ -84,6 +88,10 @@ class Scheduler { return scheduler_output; } + /** + * Some requests can contain empty blocks after prompt look-up or speculative decoding + * when candidates are not confirmed by main model and we need to free blocks, taken by these candidates + */ void clean_empty_blocks(std::vector& seq_groups) { for (const auto& seq_group : seq_groups) m_block_manager.free_empty_physical_blocks(seq_group); diff --git a/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp index 36f274f30f..5091218ccd 100644 --- a/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp +++ b/src/cpp/src/speculative_decoding/continuous_batching_for_speculative_decoding_impl.cpp @@ -17,7 +17,7 @@ ContinuousBatchingPipeline::ContinuousBatchingForSpeculativeDecodingImpl::Contin m_tokenizer = tokenizer; m_generation_config = generation_config; m_is_validation_mode_enabled = is_validation_mode_enabled; - init(model, scheduler_config, plugin_config, device_config, core); + initialize_pipeline(model, scheduler_config, plugin_config, device_config, core); } void diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp index 257c20bf01..4021742961 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp @@ -193,10 +193,20 @@ std::vector ContinuousBatchingPipeline::SpeculativeDecodingImpl::generate(const std::vector& input_ids, const std::vector& sampling_params, const StreamerVariant& streamer) { - ManualTimer generate_timer("speculative_decoding: generate()"); - generate_timer.start(); OPENVINO_ASSERT(!has_non_finished_requests(), "Generate cannot be called while ContinuousBatchingPipeline is already in running state. Use ContinuousBatchingPipeline::add_request"); OPENVINO_ASSERT(input_ids.size() == sampling_params.size()); + + ManualTimer generate_timer("speculative_decoding: generate()"); + generate_timer.start(); + + // checks that all requests has the same LoRA adapters property value + for (size_t i = 1; i < sampling_params.size(); ++i) { + OPENVINO_ASSERT(sampling_params[i - 1].adapters == sampling_params[i].adapters, + "LoRA adapters value must be the same for all requests"); + } + m_main_pipeline->set_adapters(sampling_params[0].adapters); + m_draft_pipeline->set_adapters(sampling_params[0].adapters); + const std::shared_ptr& streamer_ptr = std::visit(overloaded{ [](std::monostate) -> std::shared_ptr { return nullptr; diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp index 3df02ac394..2f8067cbab 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp @@ -34,7 +34,7 @@ struct ModelDesc { ModelDesc() = default; }; -class ContinuousBatchingPipeline::SpeculativeDecodingImpl : public ContinuousBatchingPipeline::ImplInterface { +class ContinuousBatchingPipeline::SpeculativeDecodingImpl : public ContinuousBatchingPipeline::IContinuousBatchingPipeline { protected: std::shared_ptr m_main_pipeline, m_draft_pipeline; SpeculativeDecodingMetrics m_sd_metrics; diff --git a/tests/cpp/CMakeLists.txt b/tests/cpp/CMakeLists.txt index b8c2e625c5..5880010841 100644 --- a/tests/cpp/CMakeLists.txt +++ b/tests/cpp/CMakeLists.txt @@ -23,6 +23,8 @@ file(GLOB src_files "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/sequence_group.cpp" "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/utils/*.cpp" "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/utils.cpp" "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/continuous_batching*.cpp" + "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/icontinuous_batching.cpp" + "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/lora_helper.cpp" "${OpenVINOGenAI_SOURCE_DIR}/src/cpp/src/text_callback_streamer.cpp") add_executable(${TEST_TARGET_NAME} ${tests_src})