-
Notifications
You must be signed in to change notification settings - Fork 18
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge commit 'refs/pull/18/head' of github.com:matatonic/openedai-vision
- Loading branch information
Showing
2 changed files
with
215 additions
and
38 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
|
||
import numpy as np | ||
import torch | ||
import torchvision.transforms as T | ||
from PIL import Image | ||
from torchvision.transforms.functional import InterpolationMode | ||
from transformers import AutoModel, AutoTokenizer | ||
|
||
IMAGENET_MEAN = (0.485, 0.456, 0.406) | ||
IMAGENET_STD = (0.229, 0.224, 0.225) | ||
|
||
def build_transform(input_size): | ||
MEAN, STD = IMAGENET_MEAN, IMAGENET_STD | ||
transform = T.Compose([ | ||
T.Lambda(lambda img: img.convert('RGB') if img.mode != 'RGB' else img), | ||
T.Resize((input_size, input_size), interpolation=InterpolationMode.BICUBIC), | ||
T.ToTensor(), | ||
T.Normalize(mean=MEAN, std=STD) | ||
]) | ||
return transform | ||
|
||
def find_closest_aspect_ratio(aspect_ratio, target_ratios, width, height, image_size): | ||
best_ratio_diff = float('inf') | ||
best_ratio = (1, 1) | ||
area = width * height | ||
for ratio in target_ratios: | ||
target_aspect_ratio = ratio[0] / ratio[1] | ||
ratio_diff = abs(aspect_ratio - target_aspect_ratio) | ||
if ratio_diff < best_ratio_diff: | ||
best_ratio_diff = ratio_diff | ||
best_ratio = ratio | ||
elif ratio_diff == best_ratio_diff: | ||
if area > 0.5 * image_size * image_size * ratio[0] * ratio[1]: | ||
best_ratio = ratio | ||
return best_ratio | ||
|
||
def dynamic_preprocess(image, min_num=1, max_num=12, image_size=448, use_thumbnail=False): | ||
orig_width, orig_height = image.size | ||
aspect_ratio = orig_width / orig_height | ||
|
||
# calculate the existing image aspect ratio | ||
target_ratios = set( | ||
(i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if | ||
i * j <= max_num and i * j >= min_num) | ||
target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1]) | ||
|
||
# find the closest aspect ratio to the target | ||
target_aspect_ratio = find_closest_aspect_ratio( | ||
aspect_ratio, target_ratios, orig_width, orig_height, image_size) | ||
|
||
# calculate the target width and height | ||
target_width = image_size * target_aspect_ratio[0] | ||
target_height = image_size * target_aspect_ratio[1] | ||
blocks = target_aspect_ratio[0] * target_aspect_ratio[1] | ||
|
||
# resize the image | ||
resized_img = image.resize((target_width, target_height)) | ||
processed_images = [] | ||
for i in range(blocks): | ||
box = ( | ||
(i % (target_width // image_size)) * image_size, | ||
(i // (target_width // image_size)) * image_size, | ||
((i % (target_width // image_size)) + 1) * image_size, | ||
((i // (target_width // image_size)) + 1) * image_size | ||
) | ||
# split the image | ||
split_img = resized_img.crop(box) | ||
processed_images.append(split_img) | ||
assert len(processed_images) == blocks | ||
if use_thumbnail and len(processed_images) != 1: | ||
thumbnail_img = image.resize((image_size, image_size)) | ||
processed_images.append(thumbnail_img) | ||
return processed_images, target_aspect_ratio | ||
|
||
|
||
def dynamic_preprocess2(image, min_num=1, max_num=12, prior_aspect_ratio=None, image_size=448, use_thumbnail=False): | ||
orig_width, orig_height = image.size | ||
aspect_ratio = orig_width / orig_height | ||
|
||
# calculate the existing image aspect ratio | ||
target_ratios = set( | ||
(i, j) for n in range(min_num, max_num + 1) for i in range(1, n + 1) for j in range(1, n + 1) if | ||
i * j <= max_num and i * j >= min_num) | ||
target_ratios = sorted(target_ratios, key=lambda x: x[0] * x[1]) | ||
new_target_ratios = [] | ||
for i in target_ratios: | ||
if prior_aspect_ratio[0]%i[0] or prior_aspect_ratio[1]%i[1]: | ||
new_target_ratios.append(i) | ||
else: | ||
continue | ||
# find the closest aspect ratio to the target | ||
target_aspect_ratio = find_closest_aspect_ratio( | ||
aspect_ratio, new_target_ratios, orig_width, orig_height, image_size) | ||
# calculate the target width and height | ||
target_width = image_size * target_aspect_ratio[0] | ||
target_height = image_size * target_aspect_ratio[1] | ||
blocks = target_aspect_ratio[0] * target_aspect_ratio[1] | ||
|
||
# resize the image | ||
resized_img = image.resize((target_width, target_height)) | ||
processed_images = [] | ||
for i in range(blocks): | ||
box = ( | ||
(i % (target_width // image_size)) * image_size, | ||
(i // (target_width // image_size)) * image_size, | ||
((i % (target_width // image_size)) + 1) * image_size, | ||
((i // (target_width // image_size)) + 1) * image_size | ||
) | ||
# split the image | ||
split_img = resized_img.crop(box) | ||
processed_images.append(split_img) | ||
assert len(processed_images) == blocks | ||
if use_thumbnail and len(processed_images) != 1: | ||
thumbnail_img = image.resize((image_size, image_size)) | ||
processed_images.append(thumbnail_img) | ||
return processed_images | ||
|
||
def load_image(image, input_size=448, min_num=1, max_num=12): | ||
image = image.convert('RGB') | ||
transform = build_transform(input_size=input_size) | ||
images, target_aspect_ratio = dynamic_preprocess(image, image_size=input_size, use_thumbnail=True, min_num=min_num, max_num=max_num) | ||
pixel_values = [transform(image) for image in images] | ||
pixel_values = torch.stack(pixel_values) | ||
return pixel_values, target_aspect_ratio | ||
|
||
def load_image2(image, input_size=448, min_num=1, max_num=12, target_aspect_ratio=None): | ||
image = image.convert('RGB') | ||
transform = build_transform(input_size=input_size) | ||
images = dynamic_preprocess2(image, image_size=input_size, use_thumbnail=True, min_num=min_num, max_num=max_num, prior_aspect_ratio=target_aspect_ratio) | ||
pixel_values = [transform(image) for image in images] | ||
pixel_values = torch.stack(pixel_values) | ||
return pixel_values | ||
|
||
|
||
from vision_qna import * | ||
|
||
# mx262/MiniMonkey | ||
import transformers | ||
transformers.logging.set_verbosity_error() | ||
|
||
class VisionQnA(VisionQnABase): | ||
model_name: str = "minimonkey" | ||
format: str = '' # phi15-ish | ||
#vision_layers: List[str] = ["vision", "vision_tower", "resampler", "visual", "in_proj","out_proj","c_fc","c_proj"] | ||
|
||
def __init__(self, model_id: str, device: str, device_map: str = 'auto', extra_params = {}, format = None): | ||
super().__init__(model_id, device, device_map, extra_params, format) | ||
|
||
self.tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=self.params.get('trust_remote_code', False)) | ||
self.model = AutoModel.from_pretrained(**self.params).eval() | ||
|
||
self.loaded_banner() | ||
|
||
async def stream_chat_with_images(self, request: ImageChatRequest) -> AsyncGenerator[str, None]: | ||
query, history, images, system_message = await prompt_history_images_system_from_messages( | ||
request.messages, img_tok='', url_handler=url_to_image) | ||
|
||
# set the max number of tiles in `max_num` | ||
pixel_values, target_aspect_ratio = load_image(images[0], min_num=4, max_num=12) | ||
pixel_values = pixel_values.to(torch.bfloat16).to(self.model.device) | ||
pixel_values2 = load_image2(images[0], min_num=3, max_num=7, target_aspect_ratio=target_aspect_ratio) | ||
pixel_values2 = pixel_values2.to(torch.bfloat16).to(self.model.device) | ||
pixel_values = torch.cat([pixel_values2[:-1], pixel_values[:-1], pixel_values2[-1:]], 0) | ||
|
||
generation_config = dict(do_sample=False, max_new_tokens=512) | ||
|
||
answer, history = self.model.chat(self.tokenizer, pixel_values, target_aspect_ratio, query, generation_config, history=None, return_history=True) | ||
|
||
if isinstance(answer, str): | ||
answer = [answer] | ||
|
||
for new_text in answer: | ||
if isinstance(new_text, str): | ||
yield new_text |
Oops, something went wrong.