Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add iterable streamer to multinomial sample #717

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions samples/python/multinomial_causal_lm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

This example showcases inference of text-generation Large Language Models (LLMs): `chatglm`, `LLaMA`, `Qwen` and other models with the same signature. The application doesn't have many configuration options to encourage the reader to explore and modify the source code. For example, change the device for inference to GPU. The sample fearures `ov::genai::LLMPipeline` and configures it to run random sampling algorithm. There is also a Jupyter [notebook](https://github.com/openvinotoolkit/openvino_notebooks/tree/latest/notebooks/llm-chatbot) which provides an example of LLM-powered Chatbot in Python.

This sample also contains example implementation of an iterable streamer with bufferisation.

## Download and convert the model and tokenizers

The `--upgrade-strategy eager` option is needed to ensure `optimum-intel` is upgraded to the latest version.
Expand All @@ -22,6 +24,12 @@ Discrete GPUs (dGPUs) usually provide better performance compared to CPUs. It is

See https://github.com/openvinotoolkit/openvino.genai/blob/master/src/README.md#supported-models for the list of supported models.

## Streaming

This Python example demonstrates custom detokenization with bufferization. The streamer receives integer tokens corresponding to each word or subword, one by one. If tokens are decoded individually, because of detokenize(tokenize(" a")) == "a" the resulting text will miss necessary spaces.
pavel-esir marked this conversation as resolved.
Show resolved Hide resolved

To address this, the detokenizer needs a larger context. We accumulate tokens in a tokens_cache buffer and decode multiple tokens together, adding the text to the streaming queue only when a complete decoded chunk is ready. We run a separate thread to print all new elements arriving in this queue from the generation pipeline. Each generated chunk of text is put into a synchronized queue, ensuring that all put and get operations are thread-safe and blocked until they can proceed.

### Troubleshooting

#### Unicode characters encoding error on Windows
Expand Down
133 changes: 125 additions & 8 deletions samples/python/multinomial_causal_lm/multinomial_causal_lm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,120 @@

import argparse
import openvino_genai
import queue
import threading


def streamer(subword):
print(subword, end='', flush=True)
return False
class IterableStreamer(openvino_genai.StreamerBase):
"""
A custom streamer class for handling token streaming and detokenization with buffering.

Attributes:
tokenizer (Tokenizer): The tokenizer used for encoding and decoding tokens.
tokens_cache (list): A buffer to accumulate tokens for detokenization.
text_queue (Queue): A synchronized queue for storing decoded text chunks.
print_len (int): The length of the printed text to manage incremental decoding.
"""

def __init__(self, tokenizer):
"""
Initializes the IterableStreamer with the given tokenizer.

Args:
tokenizer (Tokenizer): The tokenizer to use for encoding and decoding tokens.
"""
super().__init__()
self.tokenizer = tokenizer
self.tokens_cache = []
self.text_queue = queue.Queue()
self.print_len = 0

def __iter__(self):
"""
Returns the iterator object itself.
"""
return self

def __next__(self):
"""
Returns the next value from the text queue.

Returns:
str: The next decoded text chunk.

Raises:
StopIteration: If there are no more elements in the queue.
"""
value = self.text_queue.get() # get() will be blocked until a token is available.
if value is None:
raise StopIteration
return value

def get_stop_flag(self):
"""
Checks whether the generation process should be stopped.

Returns:
bool: Always returns False in this implementation.
"""
return False

def put_word(self, word: str):
"""
Puts a word into the text queue.

Args:
word (str): The word to put into the queue.
"""
self.text_queue.put(word)

def put(self, token_id: int) -> bool:
"""
Processes a token and manages the decoding buffer. Adds decoded text to the queue.

Args:
token_id (int): The token_id to process.

Returns:
bool: True if generation should be stopped, False otherwise.
"""
self.tokens_cache.append(token_id)
text = self.tokenizer.decode(self.tokens_cache)

word = ''
if len(text) > self.print_len and '\n' == text[-1]:
# Flush the cache after the new line symbol.
word = text[self.print_len:]
self.tokens_cache = []
self.print_len = 0
elif len(text) >= 3 and text[-3:] == "�":
pavel-esir marked this conversation as resolved.
Show resolved Hide resolved
# Don't print incomplete text.
pass
elif len(text) > self.print_len:
# It is possible to have a shorter text after adding new token.
# Print to output only if text lengh is increaesed.
word = text[self.print_len:]
self.print_len = len(text)
self.put_word(word)

if self.get_stop_flag():
# When generation is stopped from streamer then end is not called, need to call it here manually.
self.end()
return True # True means stop generation
else:
return False # False means continue generation

def end(self):
"""
Flushes residual tokens from the buffer and puts a None value in the queue to signal the end.
"""
text = self.tokenizer.decode(self.tokens_cache)
if len(text) > self.print_len:
word = text[self.print_len:]
self.put_word(word)
self.tokens_cache = []
self.print_len = 0
self.put_word(None)


def main():
Expand All @@ -19,17 +128,25 @@ def main():

device = 'CPU' # GPU can be used as well
pipe = openvino_genai.LLMPipeline(args.model_dir, device)


text_print_streamer = IterableStreamer(pipe.get_tokenizer())
def token_printer():
# Getting next elements from iterable will be blocked until a new token is available.
for word in text_print_streamer:
print(word, end='', flush=True)
printer_thread = threading.Thread(target=token_printer, daemon=True)
printer_thread.start()

config = openvino_genai.GenerationConfig()
config.max_new_tokens = 100
config.do_sample = True
config.top_p = 0.9
config.top_k = 30

# Since the streamer is set, the results will
# be printed each time a new token is generated.
pipe.generate(args.prompt, config, streamer)

# Since the streamer is set, the results will be printed
# every time a new token is generated and put into the streamer queue.
pipe.generate(args.prompt, config, text_print_streamer)
printer_thread.join()

if '__main__' == __name__:
main()
43 changes: 43 additions & 0 deletions src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ int main(int argc, char* argv[]) {
```

Streaming with a custom class:

C++ template for a stremer.
```cpp
#include "openvino/genai/streamer_base.hpp"
#include "openvino/genai/llm_pipeline.hpp"
Expand Down Expand Up @@ -210,6 +212,47 @@ int main(int argc, char* argv[]) {
}
```

Python template for a streamer.
```py
import openvino_genai as ov_genai

class CustomStreamer(ov_genai.StreamerBase):
def __init__(self, tokenizer):
super().__init__()
self.tokenizer = tokenizer
# Initialize a cache to store tokens
self.tokens_cache = []

def put(self, token_id) -> bool:
# Process a token ID and determine if the generation should stop.
# Rerturn a boolean flag indicating whether the generation should stop.
stop_flag = False

# Add the token to the cache and decode the tokens to get the text
self.tokens_cache.append(token_id)
text = self.tokenizer.decode(self.tokens_cache)

# Custom processing logic (if any)
# For example, you might want to stop generation if a certain condition is met
if some_condition:
stop_flag = True

return stop_flag

def end(self):
# Custom finalization logic (if any)
# For example, you might want to process the final text or clear the cache
final_text = self.tokenizer.decode(self.tokens_cache)
self.tokens_cache = []


pipe = ov_genai.LLMPipeline(model_path, "CPU")
custom_streamer = TextPrintStreamer(pipe.get_tokenizer())

pipe.generate("The Sun is yellow because", max_new_tokens=15, streamer=custom_streamer)
```
For fully implemented iterable CustomStreamer please refer to [multinomial_causal_lm](https://github.com/openvinotoolkit/openvino.genai/tree/releases/2024/3/samples/python/multinomial_causal_lm/README.md) sample.

### Performance Metrics

`openvino_genai.PerfMetrics` (referred as `PerfMetrics` for simplicity) is a structure that holds performance metrics for each generate call. `PerfMetrics` holds fields with mean and standard deviations for the following metrics:
Expand Down
Loading