Skip to content

Commit

Permalink
make streamer itrable, move to multinomial sample
Browse files Browse the repository at this point in the history
  • Loading branch information
pavel-esir committed Aug 5, 2024
1 parent 8a38316 commit f006764
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 54 deletions.
8 changes: 8 additions & 0 deletions samples/python/multinomial_causal_lm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

This example showcases inference of text-generation Large Language Models (LLMs): `chatglm`, `LLaMA`, `Qwen` and other models with the same signature. The application doesn't have many configuration options to encourage the reader to explore and modify the source code. For example, change the device for inference to GPU. The sample fearures `ov::genai::LLMPipeline` and configures it to run random sampling algorithm. There is also a Jupyter [notebook](https://github.com/openvinotoolkit/openvino_notebooks/tree/latest/notebooks/llm-chatbot) which provides an example of LLM-powered Chatbot in Python.

This sample also contains example implementation of an iterable streamer with bufferisation.

## Download and convert the model and tokenizers

The `--upgrade-strategy eager` option is needed to ensure `optimum-intel` is upgraded to the latest version.
Expand All @@ -22,6 +24,12 @@ Discrete GPUs (dGPUs) usually provide better performance compared to CPUs. It is

See https://github.com/openvinotoolkit/openvino.genai/blob/master/src/README.md#supported-models for the list of supported models.

## Streaming

This Python example demonstrates custom detokenization with bufferization. The streamer receives integer tokens corresponding to each word or subword, one by one. If tokens are decoded individually, because of detokenize(tokenize(" a")) == "a" the resulting text will miss necessary spaces.

To address this, the detokenizer needs a larger context. We accumulate tokens in a tokens_cache buffer and decode multiple tokens together, adding the text to the streaming queue only when a complete decoded chunk is ready. We run a separate thread to print all new elements arriving in this queue from the generation pipeline. Each generated chunk of text is put into a synchronized queue, ensuring that all put and get operations are thread-safe and blocked until they can proceed.

### Troubleshooting

#### Unicode characters encoding error on Windows
Expand Down
83 changes: 75 additions & 8 deletions samples/python/multinomial_causal_lm/multinomial_causal_lm.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,70 @@

import argparse
import openvino_genai
import queue
import threading


def streamer(subword):
print(subword, end='', flush=True)
return False
class IterableStreamer(openvino_genai.StreamerBase):
def __init__(self, tokenizer):
super().__init__()
self.tokenizer = tokenizer
self.tokens_cache = []
self.text_queue = queue.Queue()
self.print_len = 0

def __iter__(self):
return self

def __next__(self):
# get() will be blocked until a token is available.
value = self.text_queue.get()
if value is None:
raise StopIteration
return value

def get_stop_flag(self):
return False

def put_word(self, word: str):
self.text_queue.put(word)

def put(self, token_id: int) -> bool:
self.tokens_cache.append(token_id)
text = self.tokenizer.decode(self.tokens_cache)

word = ''
if len(text) > self.print_len and '\n' == text[-1]:
# Flush the cache after the new line symbol.
word = text[self.print_len:]
self.tokens_cache = []
self.print_len = 0
elif len(text) >= 3 and text[-3:] == "�":
# Don't print incomplete text.
pass
elif len(text) > self.print_len:
# It is possible to have a shorter text after adding new token.
# Print to output only if text lengh is increaesed.
word = text[self.print_len:]
self.print_len = len(text)
self.put_word(word)

if self.get_stop_flag():
# When generation is stopped from streamer then end is not called, need to call it here manually.
self.end()
return True # True means stop generation
else:
return False # False means continue generation

def end(self):
# Flush residual tokens from the buffer.
text = self.tokenizer.decode(self.tokens_cache)
if len(text) > self.print_len:
word = text[self.print_len:]
self.put_word(word)
self.tokens_cache = []
self.print_len = 0
self.put_word(None)


def main():
Expand All @@ -19,17 +78,25 @@ def main():

device = 'CPU' # GPU can be used as well
pipe = openvino_genai.LLMPipeline(args.model_dir, device)


text_print_streamer = IterableStreamer(pipe.get_tokenizer())
def token_printer():
# Getting next elements from iterable will be blocked until a new token is available.
for word in text_print_streamer:
print(word, end='', flush=True)
printer_thread = threading.Thread(target=token_printer, daemon=True)
printer_thread.start()

config = openvino_genai.GenerationConfig()
config.max_new_tokens = 100
config.do_sample = True
config.top_p = 0.9
config.top_k = 30

# Since the streamer is set, the results will
# be printed each time a new token is generated.
pipe.generate(args.prompt, config, streamer)

# Since the streamer is set, the results will be printed
# every time a new token is generated and put into the streamer queue.
pipe.generate(args.prompt, config, text_print_streamer)
printer_thread.join()

if '__main__' == __name__:
main()
71 changes: 25 additions & 46 deletions src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ int main(int argc, char* argv[]) {
```

Streaming with a custom class:

C++ template for a stremer.
```cpp
#include "openvino/genai/streamer_base.hpp"
#include "openvino/genai/llm_pipeline.hpp"
Expand Down Expand Up @@ -210,69 +212,46 @@ int main(int argc, char* argv[]) {
}
```
This Python example demonstrates custom detokenization with buferisation. The streamer receives
integer tokens corresponding to each word or subword, one by one. If tokens are decoded individually,
subwords will not be concatenated correctly, and the resulting text will lack appropriate spaces.
To address this, we accumulate tokens in a tokens_cache buffer and decode multiple tokens together,
returning the text only when a complete decoded chunk is ready.
Python template for a streamer.
```py
import openvino_genai as ov_genai
class TextPrintStreamer(ov_genai.StreamerBase):
class CustomStreamer(ov_genai.StreamerBase):
def __init__(self, tokenizer):
super().__init__()
self.tokenizer = tokenizer
# Initialize a cache to store tokens
self.tokens_cache = []
self.print_len = 0
def get_stop_flag(self):
return False
def process_word(self, word: str):
print(word, end='', flush=True)
def put(self, token_id):
def put(self, token_id) -> bool:
# Process a token ID and determine if the generation should stop.
# Rerturn a boolean flag indicating whether the generation should stop.
stop_flag = False
# Add the token to the cache and decode the tokens to get the text
self.tokens_cache.append(token_id)
text = self.tokenizer.decode(self.tokens_cache)
word = ''
if len(text) > self.print_len and '\n' == text[-1]:
# Flush the cache after the new line symbol.
word = text[self.print_len:]
self.tokens_cache = []
self.print_len = 0
elif len(text) >= 3 and text[-3:] == "�":
# Don't print incomplete text.
pass
elif len(text) > self.print_len:
# It is possible to have a shorter text after adding new token.
# Print to output only if text lengh is increaesed.
word = text[self.print_len:]
self.print_len = len(text)
self.process_word(word)
if self.get_stop_flag():
# When generation is stopped from streamer then end is not called, need to call it here manually.
self.end()
return True # True means stop generation
else:
return False # False means continue generation
# Custom processing logic (if any)
# For example, you might want to stop generation if a certain condition is met
if some_condition:
stop_flag = True
return stop_flag
def end(self):
# Flush residual tokens from the buffer.
text = self.tokenizer.decode(self.tokens_cache)
if len(text) > self.print_len:
word = text[self.print_len:]
self.process_word(word)
self.tokens_cache = []
self.print_len = 0
# Custom finalization logic (if any)
# For example, you might want to process the final text or clear the cache
final_text = self.tokenizer.decode(self.tokens_cache)
self.tokens_cache = []
pipe = ov_genai.LLMPipeline(model_path, "CPU")
text_print_streamer = TextPrintStreamer(pipe.get_tokenizer())
custom_streamer = TextPrintStreamer(pipe.get_tokenizer())
pipe.generate("The Sun is yellow because", max_new_tokens=15, streamer=text_print_streamer)
pipe.generate("The Sun is yellow because", max_new_tokens=15, streamer=custom_streamer)
```
For fully implemented iterable CustomStreamer please refer to [multinomial_causal_lm](https://github.com/openvinotoolkit/openvino.genai/tree/releases/2024/3/samples/python/multinomial_causal_lm/README.md) sample.

### Performance Metrics

Expand Down

0 comments on commit f006764

Please sign in to comment.