Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] ExLlamaV2DynamicGenerator class is not multiple threads supported #690

Open
3 tasks done
UTSAV-44 opened this issue Nov 29, 2024 · 4 comments
Open
3 tasks done
Labels
bug Something isn't working

Comments

@UTSAV-44
Copy link

OS

Linux

GPU Library

CUDA 12.x

Python version

3.12

Pytorch version

2.4.1

Model

No response

Describe the bug

For generation of output , I have used generate function of ExLlamaV2DynamicGenerator class .Since I have implemented it in such a way that this method will be called in parallel threads , but If i am running it in 3 parallel threads ,response is generated only for one thread and getting error for other threads .
KeyError: 16
2024-11-26 05:57:16,799 Exception on /v1/via_prod_local_llm_in_request [POST]
Traceback (most recent call last):
File "/usr/local/lib/python3.11/dist-packages/flask/app.py", line 1473, in wsgi_app
response = self.full_dispatch_request()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/flask/app.py", line 882, in full_dispatch_request
rv = self.handle_user_exception(e)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/flask/app.py", line 880, in full_dispatch_request
rv = self.dispatch_request()
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/flask/app.py", line 865, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args) # type: ignore[no-any-return]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/server_exllama.py", line 106, in run_via_local_llm_in_request
output = mihup_llm_module.run_mihup_llm_inference(call_transcript=call_transcript_str,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/exllama_inference.py", line 143, in run_mihup_llm_inference
outputs = self.models[model_index].response_generator(prompts, filters, use_case_ids,self.universal_filter_map)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/exllama_inference.py", line 53, in response_generator
outputs = self.generator.generate(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/dist-packages/exllamav2/generator/dynamic.py", line 664, in generate
idx = order[r["serial"]]
~~~~~^^^^^^^^^^^^^
KeyError: 16

Reproduction steps

I am adding below the code snippet -
class LockedList:
def init(self, objects: List[Any]):
self.objects = objects
self.locks = [Lock() for _ in objects]
self.last_used_index = -1
self.access_lock = Lock() # Add global access lock

def access_with_lock(self, index: int):
    """
    Access an object in the list with proper locking.
    Returns both the object and a context manager for the lock.
    """
    if index < 0 or index >= len(self.objects):
        raise IndexError("Index out of bounds.")

    lock = self.locks[index]
    acquired = lock.acquire(blocking=True, timeout=10.0)  # Add timeout
    if not acquired:
        raise TimeoutError(f"Failed to acquire lock for model {index}")

    print(f"Accessing object at index {index} with lock.")
    return self.objects[index]

def get_new_index(self) -> int:
    with self.access_lock:  # Use global lock for index selection
        start_index = (self.last_used_index + 1) % len(self.objects)

        # First try: Look for first available model
        for offset in range(len(self.objects)):
            index = (start_index + offset) % len(self.objects)
            if not self.locks[index].locked():
                self.last_used_index = index
                return index

        # If all models are busy, wait for the least recently used one
        least_recent_index = min(range(len(self.objects)),
                                 key=lambda i: getattr(self.objects[i], 'last_access_time', 0))
        return least_recent_index

class ThreadSafeExLlamaV2:
def init(self, model_path: str, max_seq_len: int = 256 * 96):
self.config = ExLlamaV2Config(model_path)
self.model = ExLlamaV2(self.config)
self.cache = ExLlamaV2Cache_Q4(self.model, max_seq_len=max_seq_len, lazy=True)
self.model.load_autosplit(self.cache)
self.tokenizer = ExLlamaV2Tokenizer(self.config)

    self.generator = ExLlamaV2DynamicGenerator(
        model=self.model,
        cache=self.cache,
        tokenizer=self.tokenizer
    )

    # self.generator_lock = Lock()

    self.gen_settings = ExLlamaV2Sampler.Settings(
        temperature=0.0,  # Set to 0 for deterministic output
        top_k=1,  # Only consider the most likely token
        top_p=1.0,  # No nucleus sampling
        token_repetition_penalty=1.0  # No repetition penalty
    )

    self.generator.warmup()

def cache_debug(self):
    """Debug helper to print cache memory addresses"""
    cache_tensors = self.cache.all_tensors()
    print(f"Cache tensor addresses for model instance {id(self)}:")
    for i, tensor in enumerate(cache_tensors):
        print(f"  Tensor {i}: {hex(tensor.data_ptr())}")

def generate_response_open_usecase(self, prompts,max_new_tokens ):
    self.cache.reset()
    outputs = self.generator.generate(
        prompt=prompts,
        add_bos=True,
        stop_conditions=get_llama3_stop_conditions(self.tokenizer),
        gen_settings=self.gen_settings,
        completion_only=True,
        encode_special_tokens=True,
        max_new_tokens=max_new_tokens,
    )
    return outputs

class MihupExLlamaLLM:
def init(self, thread_count: int):
thread_count = max(1, min(thread_count, 4))
print(f"Initializing {thread_count} model instances")

    models = []
    for i in range(thread_count):
        try:
            model = ThreadSafeExLlamaV2("/app/Llama-3.1-8B-Instruct-exl2")
            model.last_access_time = 0  # Add timestamp tracking
            models.append(model)
            print(f"Successfully initialized model {i + 1}")
        except Exception as e:
            print(f"Failed to initialize model {i + 1}: {str(e)}")
            raise

    self.model_locked_list = LockedList(models)
    self.universal_filter_map = {}
    self.universal_filter_map_lock = Lock()

def run_mihup_llm_inference(self, call_transcript: str, prompt_tuples: List[Tuple]) -> List[dict]:
    try:
        # Get model index and acquire lock
        model_index = self.model_locked_list.get_new_index()
        inner_model = None

        try:
            inner_model = self.model_locked_list.access_with_lock(model_index)
            inner_model.last_access_time = datetime.now()

            # Process transcript and prepare prompts
            common_transcript = format_transcript_text(call_transcript)
            prompts = []
            filters = []
            use_case_ids = []

            used_previous_filter_data = False
            for upper_tuple in prompt_tuples:
                use_case_id = upper_tuple[1]
                use_case_ids.append(use_case_id)
                p = upper_tuple[0]
                prompt_str = p[0]
                prompt_question_combined = format_llama3_prompt(mihup_system_prompt,common_transcript + prompt_str)
                prompts.append(prompt_question_combined)
                filter_schema_parser = p[1]

                # Handle filters with proper locking
                with self.universal_filter_map_lock:
                    if use_case_id not in self.universal_filter_map:
                        print(f"Creating new filter for model {model_index}")
                        self.universal_filter_map[use_case_id] = [
                            ExLlamaV2TokenEnforcerFilter(filter_schema_parser, inner_model.tokenizer),
                            ExLlamaV2PrefixFilter(inner_model.model, inner_model.tokenizer, ["{", " {"])
                        ]
                    else:
                        used_previous_filter_data = True
                    filters.append(self.universal_filter_map[use_case_id])

            if used_previous_filter_data:
                print(f"Used existing filter for model {model_index}")

            # Generate outputs
            print(f"Processing with model {model_index}")

            # Add debug prints around cache reset
            print(f"\nResetting cache for model {model_index}")
            inner_model.cache_debug()

            inner_model.cache.reset()

            print(f"\nAfter cache reset for model {model_index}")
            inner_model.cache_debug()

            outputs = inner_model.generator.generate(
                prompt=prompts,
                filters=filters,
                filter_prefer_eos=True,
                max_new_tokens=2048,
                add_bos=True,
                stop_conditions=get_llama3_stop_conditions(inner_model.tokenizer),
                gen_settings=inner_model.gen_settings,
                completion_only=True,
                encode_special_tokens=True,
            )

            # Process outputs
            final_output = []
            skipped_index = []
            for i in range(len(outputs)):
                try:
                    output_json = json.loads(outputs[i])
                    final_output.append(output_json)
                except ValueError as e:
                    skipped_index.append(i)
                    print(f"Error parsing output {i}: {str(e)}")
                    print("Raw output:", outputs[i])

            # Reset filter states
            # with self.universal_filter_map_lock:
            #     for idx, use_case_id in enumerate(use_case_ids):
            #         if use_case_id in self.universal_filter_map:
            #             for filter in self.universal_filter_map[use_case_id]:
            #                 filter.reset_state()

            # Add use case IDs to output
            for idx in range(len(final_output)):
                if idx not in skipped_index:
                    final_output[idx]['use_case_id'] = use_case_ids[idx]

            return final_output

        finally:
            if inner_model is not None:
                self.model_locked_list.locks[model_index].release()
                print(f"Released lock for model {model_index}")

    except Exception as e:
        print(f"Error in run_mihup_llm_inference: {str(e)}")
        raise

Expected behavior

Previously when I was running the server using gunicorn with varying workers count it was working very well but when i changed it to varying threads then it is not working

Logs

No response

Additional context

No response

Acknowledgements

  • I have looked for similar issues before submitting this one.
  • I understand that the developers have lives and my issue will be answered when possible.
  • I understand the developers of this program are human, and I will ask my questions politely.
@UTSAV-44 UTSAV-44 added the bug Something isn't working label Nov 29, 2024
@turboderp
Copy link
Member

I'm unsure what you're trying to achieve.

The generator is extremely stateful and inherently single-threaded. It utilizes batching to run multiple requests concurrently, not threading. If you want threads to be able to start requests at any time, you'll need a single server thread calling the generator and a dispatch mechanism to distribute results back to clients.

The specific error you're getting would occur if the job queue is manipulated in the middle of the generate function, i.e. the generation loop starts getting output it wasn't expecting. Can you confirm that the access_with_lock function is working as intended? Or maybe the generator is being used from some other thread, maybe calling that generate_response_open_usecase function without locking?

@UTSAV-44
Copy link
Author

UTSAV-44 commented Dec 2, 2024

I am attempting to generate three responses in parallel by calling the inner_model.generator.generate function. However, I observed that only one thread successfully returns a response, while the others result in errors. I have also created three instances of the ExLlamaV2DynamicGenerator class, as the generate function is being invoked three times.

model = IsolatedExLlamaV2("/app/Llama-3.1-8B-Instruct-exl2")
                model.last_access_time = 0  # Add timestamp tracking
                models.append(model)
                print(f"Successfully initialized model {i + 1}")
class IsolatedExLlamaV2:
    def __init__(self, model_path: str, max_seq_len: int = 256 * 96):
        self.config = ExLlamaV2Config(model_path)
        self.model = ExLlamaV2(self.config)
        self.tokenizer = ExLlamaV2Tokenizer(self.config)
        # Create isolated cache for this instance
        self.cache = ExLlamaV2Cache_Q4(
            self.model,
            max_seq_len=max_seq_len,
            lazy=True
        )

        # Load model with isolated cache
        self.model.load_autosplit(self.cache)

        # Create isolated generator with its own memory space
        self.generator = ExLlamaV2DynamicGenerator(
            model=self.model,
            cache=self.cache,
            tokenizer=self.tokenizer,
            max_batch_size=1,  # Limit batch size per instance
            paged=True
        )
        self.gen_settings = ExLlamaV2Sampler.Settings(
            temperature=0.0,  # Set to 0 for deterministic output
            top_k=1,  # Only consider the most likely token
            top_p=1.0,  # No nucleus sampling
            token_repetition_penalty=1.0  # No repetition penalty
        )

        self.generator.warmup()

access_with_lock function is working as intended.I am not using generate_response_open_usecase function anyways.

@UTSAV-44
Copy link
Author

UTSAV-44 commented Dec 2, 2024

@turboderp The reason I am initializing 3 models and there separate inference pipelines and not going for a batch inference approach is due to my particular need. During each inference, I have a multi stage structured output requirement, for which I set a large context during each such stage, given that the inference supports page caching, thus I assume, that in spite of setting the same context again and again, as a part of the prompt, the inherent mechanism would reuse the previous state and remove most of the redundant processing. Only after completion of the all the stages, I reset the cache explicitly.

Now, the next issue is how can I cache the structured output filter (ExLlamaV2TokenEnforcerFilter) across these 3 instances. If I launch the 3 instances as separate python processes then there is no issue, only problem is that I need to have 3 instances of structured output filter as well, thus increasing my system RAM requirement. For this I am creating a pub sub mechanism, and in a single thread launching 3 such instances of inference and now this makes sharing the same filter memory space across all instances. The issue that I see now, somehow parallel inference corrupts output of each other. Now, I have printed cached tensor memory pointers of each model, and they do point to different location. I am still now sure, what is implicitly getting shared across, each model that is causing this issue. Can it be that sharing ExLlamaV2TokenEnforcerFilter itself be the culprit?

@turboderp
Copy link
Member

During each inference, I have a multi stage structured output requirement, for which I set a large context during each such stage, given that the inference supports page caching, thus I assume, that in spite of setting the same context again and again, as a part of the prompt, the inherent mechanism would reuse the previous state and remove most of the redundant processing. Only after completion of the all the stages, I reset the cache explicitly.

You are correct that the generator will reuse keys/values from the previous context if the new context shares a prefix with it. However it remembers more than one context, so if I'm understanding you correctly you'd still be better served with a single instance with a larger cache. Pages are only evicted from the cache when it fills up, and there should be no need to manually clear it.

Now, there still shouldn't be any global state causing the issues you're seeing, if I'm understanding correctly. I.e. two separate instances of model+cache+generator should be able to run across two separate threads (they may launch child threads, but still without any global state). If you had two generators referencing the same model instance from separate threads, that would cause issues, perhaps.

The specific exception you're getting, though, that happens because one generator started a new job while it was in a generation loop and not expecting any new jobs to start. So I'm not sure how to diagnose, really. I'm a little curious how you're creating the model, cache and generator instances.

For structured generation, you should be able to reuse parts of the filter across threads, but the filter itself is a stateful object that needs an instance per job.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants