Skip to content

Commit

Permalink
Add iterable streamer to multinomial sample (#717)
Browse files Browse the repository at this point in the history
Co-authored-by: Zlobin Vladimir <[email protected]>
Co-authored-by: Ekaterina Aidova <[email protected]>
  • Loading branch information
3 people authored Aug 6, 2024
1 parent a295fe1 commit cfefc25
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 18 deletions.
8 changes: 8 additions & 0 deletions samples/python/multinomial_causal_lm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

This example showcases inference of text-generation Large Language Models (LLMs): `chatglm`, `LLaMA`, `Qwen` and other models with the same signature. The application doesn't have many configuration options to encourage the reader to explore and modify the source code. For example, change the device for inference to GPU. The sample fearures `ov::genai::LLMPipeline` and configures it to run random sampling algorithm. There is also a Jupyter [notebook](https://github.com/openvinotoolkit/openvino_notebooks/tree/latest/notebooks/llm-chatbot) which provides an example of LLM-powered Chatbot in Python.

This sample also contains example implementation of an iterable streamer with bufferisation.

## Download and convert the model and tokenizers

The `--upgrade-strategy eager` option is needed to ensure `optimum-intel` is upgraded to the latest version.
Expand All @@ -22,6 +24,12 @@ Discrete GPUs (dGPUs) usually provide better performance compared to CPUs. It is

See https://github.com/openvinotoolkit/openvino.genai/blob/master/src/README.md#supported-models for the list of supported models.

## Streaming

This Python example demonstrates custom detokenization with bufferization. The streamer receives integer tokens corresponding to each word or subword, one by one. If tokens are decoded individually, the resulting text misses necessary spaces because of detokenize(tokenize(" a")) == "a".

To address this, the detokenizer needs a larger context. We accumulate tokens in a tokens_cache buffer and decode multiple tokens together, adding the text to the streaming queue only when a complete decoded chunk is ready. We run a separate thread to print all new elements arriving in this queue from the generation pipeline. Each generated chunk of text is put into a synchronized queue, ensuring that all put and get operations are thread-safe and blocked until they can proceed.

### Troubleshooting

#### Unicode characters encoding error on Windows
Expand Down
133 changes: 125 additions & 8 deletions samples/python/multinomial_causal_lm/multinomial_causal_lm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,120 @@

import argparse
import openvino_genai
import queue
import threading


def streamer(subword):
print(subword, end='', flush=True)
return False
class IterableStreamer(openvino_genai.StreamerBase):
"""
A custom streamer class for handling token streaming and detokenization with buffering.
Attributes:
tokenizer (Tokenizer): The tokenizer used for encoding and decoding tokens.
tokens_cache (list): A buffer to accumulate tokens for detokenization.
text_queue (Queue): A synchronized queue for storing decoded text chunks.
print_len (int): The length of the printed text to manage incremental decoding.
"""

def __init__(self, tokenizer):
"""
Initializes the IterableStreamer with the given tokenizer.
Args:
tokenizer (Tokenizer): The tokenizer to use for encoding and decoding tokens.
"""
super().__init__()
self.tokenizer = tokenizer
self.tokens_cache = []
self.text_queue = queue.Queue()
self.print_len = 0

def __iter__(self):
"""
Returns the iterator object itself.
"""
return self

def __next__(self):
"""
Returns the next value from the text queue.
Returns:
str: The next decoded text chunk.
Raises:
StopIteration: If there are no more elements in the queue.
"""
value = self.text_queue.get() # get() will be blocked until a token is available.
if value is None:
raise StopIteration
return value

def get_stop_flag(self):
"""
Checks whether the generation process should be stopped.
Returns:
bool: Always returns False in this implementation.
"""
return False

def put_word(self, word: str):
"""
Puts a word into the text queue.
Args:
word (str): The word to put into the queue.
"""
self.text_queue.put(word)

def put(self, token_id: int) -> bool:
"""
Processes a token and manages the decoding buffer. Adds decoded text to the queue.
Args:
token_id (int): The token_id to process.
Returns:
bool: True if generation should be stopped, False otherwise.
"""
self.tokens_cache.append(token_id)
text = self.tokenizer.decode(self.tokens_cache)

word = ''
if len(text) > self.print_len and '\n' == text[-1]:
# Flush the cache after the new line symbol.
word = text[self.print_len:]
self.tokens_cache = []
self.print_len = 0
elif len(text) >= 3 and text[-3:] == chr(65533):
# Don't print incomplete text.
pass
elif len(text) > self.print_len:
# It is possible to have a shorter text after adding new token.
# Print to output only if text lengh is increaesed.
word = text[self.print_len:]
self.print_len = len(text)
self.put_word(word)

if self.get_stop_flag():
# When generation is stopped from streamer then end is not called, need to call it here manually.
self.end()
return True # True means stop generation
else:
return False # False means continue generation

def end(self):
"""
Flushes residual tokens from the buffer and puts a None value in the queue to signal the end.
"""
text = self.tokenizer.decode(self.tokens_cache)
if len(text) > self.print_len:
word = text[self.print_len:]
self.put_word(word)
self.tokens_cache = []
self.print_len = 0
self.put_word(None)


def main():
Expand All @@ -19,17 +128,25 @@ def main():

device = 'CPU' # GPU can be used as well
pipe = openvino_genai.LLMPipeline(args.model_dir, device)


text_print_streamer = IterableStreamer(pipe.get_tokenizer())
def token_printer():
# Getting next elements from iterable will be blocked until a new token is available.
for word in text_print_streamer:
print(word, end='', flush=True)
printer_thread = threading.Thread(target=token_printer, daemon=True)
printer_thread.start()

config = openvino_genai.GenerationConfig()
config.max_new_tokens = 100
config.do_sample = True
config.top_p = 0.9
config.top_k = 30

# Since the streamer is set, the results will
# be printed each time a new token is generated.
pipe.generate(args.prompt, config, streamer)

# Since the streamer is set, the results will be printed
# every time a new token is generated and put into the streamer queue.
pipe.generate(args.prompt, config, text_print_streamer)
printer_thread.join()

if '__main__' == __name__:
main()
43 changes: 33 additions & 10 deletions src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ int main(int argc, char* argv[]) {
```

Streaming with a custom class:

C++ template for a stremer.
```cpp
#include "openvino/genai/streamer_base.hpp"
#include "openvino/genai/llm_pipeline.hpp"
Expand All @@ -186,18 +188,14 @@ Streaming with a custom class:
class CustomStreamer: public ov::genai::StreamerBase {
public:
bool put(int64_t token) {
bool stop_flag = false;
/*
custom decoding/tokens processing code
tokens_cache.push_back(token);
std::string text = m_tokenizer.decode(tokens_cache);
...
*/
return stop_flag; // flag whether generation should be stoped, if true generation stops.
// Custom decoding/tokens processing logic.

// Returns a flag whether generation should be stoped, if true generation stops.
return false;
};

void end() {
/* custom finalization */
// Custom finalization logic.
};
};

Expand All @@ -206,10 +204,35 @@ int main(int argc, char* argv[]) {

std::string model_path = argv[1];
ov::genai::LLMPipeline pipe(model_path, "CPU");
std::cout << pipe.generate("The Sun is yellow because", ov::genai::streamer(custom_streamer), ov::genai::max_new_tokens(200));
std::cout << pipe.generate("The Sun is yellow because", ov::genai::max_new_tokens(15), ov::genai::streamer(custom_streamer));
}
```
Python template for a streamer.
```py
import openvino_genai as ov_genai
class CustomStreamer(ov_genai.StreamerBase):
def __init__(self):
super().__init__()
# Initialization logic.
def put(self, token_id) -> bool:
# Custom decoding/tokens processing logic.
# Returns a flag whether generation should be stoped, if true generation stops.
return False
def end(self):
# Custom finalization logic.
pipe = ov_genai.LLMPipeline(model_path, "CPU")
custom_streamer = CustomStreamer()
pipe.generate("The Sun is yellow because", max_new_tokens=15, streamer=custom_streamer)
```
For fully implemented iterable CustomStreamer please refer to [multinomial_causal_lm](https://github.com/openvinotoolkit/openvino.genai/tree/releases/2024/3/samples/python/multinomial_causal_lm/README.md) sample.

### Performance Metrics

`openvino_genai.PerfMetrics` (referred as `PerfMetrics` for simplicity) is a structure that holds performance metrics for each generate call. `PerfMetrics` holds fields with mean and standard deviations for the following metrics:
Expand Down

0 comments on commit cfefc25

Please sign in to comment.