GTA1-32B

1.4K
5
32.0B
1 language
license:mit
by
Salesforce
Image Model
OTHER
32B params
New
1K downloads
Early-stage
Edge AI:
Mobile
Laptop
Server
72GB+ RAM
Mobile
Laptop
Server
Quick Summary

Reinforcement learning (RL) (e.

Device Compatibility

Mobile
4-6GB RAM
Laptop
16GB RAM
Server
GPU
Minimum Recommended
30GB+ RAM

Code Examples

Inferencepythontransformers
from transformers import AutoTokenizer, AutoImageProcessor
from transformers.models.qwen2_vl.image_processing_qwen2_vl_fast import smart_resize
from PIL import Image
from io import BytesIO
import base64
import re
from vllm import LLM, SamplingParams

instruction="click start"
image_path="example.png"

CLICK_REGEXES = [
                # pyautogui.click(x=123, y=456)
                re.compile(r"click\s*\(\s*x\s*=\s*(\d+)\s*,\s*y\s*=\s*(\d+)\s*\)", re.IGNORECASE),
                # pyautogui.click(123, 456) or click(123,456)
                re.compile(r"click\s*\(\s*(\d+)\s*,\s*(\d+)\s*\)", re.IGNORECASE),
            ]

def format_message(image_path,instruction):
    SYSTEM_PROMPT = (
        "You are a GUI agent. You are given a task and a screenshot of the screen. "
        "You need to perform a series of pyautogui actions to complete the task."
    )
    messages = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": [
            {"type": "image", "image": image_path},
            {"type": "text", "text": instruction},
        ]},
    ]
    text = prompt_tok.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)

    text2, n = re.subn(
        r"<\|media_begin\|>.*?<\|media_end\|>",
        "<|vision_start|><|image_pad|><|vision_end|>",
        text,
        flags=re.S
    )
    if n == 0:
        raise RuntimeError("Cannot find <|media_begin|>...<|media_end|> token.")
    return text2

def parse_xy_from_text(text: str):
      if "click" not in text.lower():
          return [-1, -1]
      for rx in CLICK_REGEXES:
          m = rx.search(text)
          if m:
              try:
                  return int(m.group(1)), int(m.group(2))
              except Exception:
                  continue
      return [-1,-1]

def convert_pil_image_to_base64(image):
    buffered = BytesIO()
    image.save(buffered, format="PNG")
    return base64.b64encode(buffered.getvalue()).decode()

llm = LLM(
            model="Salesforce/GTA1-32B",
            tokenizer="Salesforce/GTA1-32B",
            tokenizer_mode="slow",       
            trust_remote_code=True,
            dtype="bfloat16",
            limit_mm_per_prompt={"image": 1},
            tensor_parallel_size=1,
        )
prompt_tok = AutoTokenizer.from_pretrained("Salesforce/GTA1-32B", trust_remote_code=True)
sp = SamplingParams(max_tokens=512, temperature=0.0)
tokenizer = llm.get_tokenizer()
processor=AutoImageProcessor.from_pretrained("Salesforce/GTA1-32B", trust_remote_code=True)

image = Image.open(image_path).convert('RGB')
resized_height, resized_width = smart_resize(
            image.height,
            image.width,
            factor=processor.patch_size * processor.merge_size,
            min_pixels=processor.min_pixels,
            max_pixels=processor.max_pixels,
        )
resized_image = image.resize((resized_width, resized_height))
messages = format_message(image_path, instruction)
response = llm.generate(
            [{"prompt": messages, "multi_modal_data": {"image": [resized_image]}}],
            sampling_params=sp
        )[0].outputs[0].text


coordinates = parse_xy_from_text(response)
print(coordinates[0]/resized_width*image.width, coordinates[1]/resized_height*image.height)
Model Servingpythontransformers
import torch
import os
# -------------------------
# System / Torch defaults
# -------------------------
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false")  # avoid CPU oversubscription
os.environ.setdefault("VLLM_USE_V1", "1")
os.environ.setdefault("VLLM_ENGINE_IN_BACKGROUND_THREAD", "0")
import base64
import re
from typing import Dict, List, Union
from PIL import Image
from io import BytesIO
import traceback
import argparse
import asyncio
import requests
import ray
from ray import serve
from fastapi import FastAPI
from transformers import AutoTokenizer
from vllm import LLM, SamplingParams
import uuid


N_REPLICAS = 2

try:
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.benchmark = True
except Exception:
    pass


# -------------------------
# IO helpers
# -------------------------

def pil_to_base64(img: Image.Image, format: str = "PNG") -> str:
    buffer = BytesIO()
    img.save(buffer, format=format)
    img_bytes = buffer.getvalue()
    img_b64 = base64.b64encode(img_bytes).decode("utf-8")
    return img_b64


def data_uri_to_pil(data_uri: str) -> Image.Image:
    header, b64_str = data_uri.split(",", 1)
    img_data = base64.b64decode(b64_str)
    buffer = BytesIO(img_data)
    img = Image.open(buffer)
    return img


def extract_images(messages: List[Dict]) -> List[Image.Image]:
    images = []
    for msg in messages:
        if msg.get("role") == "user":
            for content in msg.get("content", []):
                if content.get("type") in ["image", "image_url"]:
                    if content["type"] == "image":
                        images.append(data_uri_to_pil(content["image"]).convert("RGB"))
                    else:
                        images.append(data_uri_to_pil(content["image_url"]["url"]).convert("RGB"))
    return images


# -------------------------
# Prompt builder
# -------------------------

def build_prompt_with_template(tokenizer: AutoTokenizer, messages: List[Dict]) -> str:
    text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    text2, n = re.subn(
        r"<\|media_begin\|>.*?<\|media_end\|>",
        "<|vision_start|><|image_pad|><|vision_end|>",
        text,
        flags=re.S,
    )
    if n == 0:
        raise RuntimeError("Did not find <|media_begin|>...<|media_end|> block in template.")
    return text2

# -------------------------
# Deployment
# -------------------------

def build_app(model_path: str, num_replicas: int, port: int):
    api = FastAPI(title="GTA1-32B Multi-GPU Service (High-throughput)")

    @serve.deployment(
        num_replicas=num_replicas,
        ray_actor_options={"num_gpus": 1, "num_cpus": 4},
        max_ongoing_requests=16,
    )
    class GTA1Model:
        def __init__(self, model_path: str):
            gpu_ids = ray.get_gpu_ids()
            self.gpu_id = gpu_ids[0] if gpu_ids else 0
            print(f"🔍 Ray assigned GPU IDs: {gpu_ids}")        
            # Initialize vLLM within this replica (Ray sets CUDA_VISIBLE_DEVICES)
            print(f"🔄 Initializing vLLM on GPU {self.gpu_id}[ray id] from {model_path}")
            if not torch.cuda.is_available():
                raise RuntimeError("CUDA is not available")

            self.llm = LLM(
                model=model_path,
                tokenizer=model_path,
                tokenizer_mode="slow",
                trust_remote_code=True,
                dtype="bfloat16",
                limit_mm_per_prompt={"image": 1},
                max_model_len=32768,
                tensor_parallel_size=1,
            )
            self.vllm_tokenizer = self.llm.get_tokenizer()
            self.hf_tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
            self.model_path = model_path
            self.dtype = "bfloat16"
            print(f"✅ vLLM initialized successfully (Ray GPU Id: {self.gpu_id})")

        # ------------ batching core ------------
        @serve.batch(max_batch_size=8, batch_wait_timeout_s=0.1) # increase if GPU allows
        async def _generate_batch(self, payload: Union[Dict, List[Dict]]):
            """Build prompts, enforce single image, and call vLLM.generate."""
            if isinstance(payload, dict):
                list_of_payloads = [payload]
            else:
                list_of_payloads = payload
            request_id = uuid.uuid4().hex[:8]
            # --- Build per-sample prompt/image ---
            prompts: List[str] = []
            images_per_req: List[Image.Image] = []
            error_results = []
            early_exit = False
            for p in list_of_payloads:
                try:
                    messages = p["messages"]
                    imgs = extract_images(messages)
                    if len(imgs) != 1:
                        raise RuntimeError(f"Exactly one image is required, got {len(imgs)}")
                    prompt_text = build_prompt_with_template(self.hf_tokenizer, messages)
                    # Sanity check on tokens: 1 <|image_pad|>, no <|media_placeholder|>
                    tok = self.vllm_tokenizer
                    id_imgpad = tok.encode("<|image_pad|>", add_special_tokens=False)[0]
                    id_media = tok.encode("<|media_placeholder|>", add_special_tokens=False)[0]
                    ids = tok.encode(prompt_text, add_special_tokens=False)
                    if sum(i == id_imgpad for i in ids) != 1 or any(i == id_media for i in ids):
                        raise RuntimeError("Prompt media tokens invalid after conversion")
                    prompts.append(prompt_text)
                    images_per_req.append(imgs[0])
                except Exception as e:
                    early_exit = True
                    trace = traceback.format_exc()
                    error_results.append(
                        {
                            "response": "", 
                            "error": {
                                        "message": str(e), 
                                        "trace": trace, 
                                        'type_of_payload': str(type(payload)), 
                                        'type_of_list_of_payloads': str(type(list_of_payloads)),
                                        'type_of_p': str(type(p)),
                                        'p_keys': str(p.keys()) if isinstance(p, dict) else str(p),
                                    }, 
                            "usage": {}, 
                            "gpu_id": self.gpu_id
                        }
                     )
            if early_exit:
                return error_results
            # --- vLLM generation ---
            args_base = list_of_payloads[0]
            sp = SamplingParams(
                max_tokens=args_base.get("max_new_tokens", 512),
                temperature=args_base.get("temperature", 0.0),
                top_p=args_base.get("top_p", 0.9),
            )

            requests_list = [
                {"prompt": pr, "multi_modal_data": {"image": [im]}}
                for pr, im in zip(prompts, images_per_req)
            ]

            outs = self.llm.generate(requests_list, sampling_params=sp)

            tok = self.vllm_tokenizer
            results: List[Dict] = []
            for pr, o in zip(prompts, outs):
                text = o.outputs[0].text if o.outputs else ""
                gen_tokens = len(o.outputs[0].token_ids) if (o.outputs and hasattr(o.outputs[0], 'token_ids')) else None
                prompt_tokens = len(tok.encode(pr, add_special_tokens=False))
                usage = {
                    "prompt_tokens": prompt_tokens,
                    "generated_tokens": gen_tokens if gen_tokens is not None else None,
                    "total_tokens": (prompt_tokens + gen_tokens) if gen_tokens is not None else None,
                }
                results.append({
                    "response": text,
                    "error": "",
                    "usage": usage,
                    "gpu_id": self.gpu_id,
                    'bs_size_in_this_request': f"{request_id}:{len(list_of_payloads)}"
                })

            return results

        # Exposed single-call entry that joins the batch
        async def call_llm(self, payload: Dict):
            try:
                res = await self._generate_batch(payload)
                return res
            except Exception as e:
                trace = traceback.format_exc()
                return {"response": "", "error": {"message": str(e), "trace": trace}, "usage": {}, "gpu_id": self.gpu_id}

        def health(self):
            return {
                "status": "ok",
                "gpu_id": self.gpu_id,
                "dtype": self.dtype,
                "model_path": self.model_path,
            }

    model = GTA1Model.bind(model_path)

    @serve.deployment(max_ongoing_requests=96)
    @serve.ingress(api)
    class GTA1App:
        def __init__(self, model_handle):
            self.model_deployment = model_handle

        @api.get("/health")
        async def health_all(self):
            # Calling the same Serve handle N times does not guarantee each call hits a different replica
            attempts = max(8, N_REPLICAS * 4)  # oversample
            calls = [self.model_deployment.health.remote() for i in range(attempts)]
            replies = await asyncio.gather(*calls)
            # dedupe by replica_id (or by tuple(gpu_id))
            seen = {}
            for r in replies:
                seen[r.get("gpu_id", f"unknown-{len(seen)}")] = r
                if len(seen) >= N_REPLICAS:
                    break
            return {"replicas": list(seen.values())}

        @api.post("/call_llm")
        async def call_llm(self, req: Dict):
            return await self.model_deployment.call_llm.remote(req)

    return GTA1App.bind(model)


# -------------------------
# Main
# -------------------------
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--model_path", type=str, default="Salesforce/GTA1-32B")
    parser.add_argument("--host", type=str, default="0.0.0.0")
    parser.add_argument("--port", type=int, default=3005)
    parser.add_argument("--num_replicas", type=int, default=2)
    args = parser.parse_args()
    N_REPLICAS = args.num_replicas
    ray.init(ignore_reinit_error=True)

    print(f"🚀 Starting GTA1-32B service on {args.host}:{args.port}")
    serve.start(detached=True, http_options={"host": args.host, "port": args.port})

    app = build_app(args.model_path, args.num_replicas, args.port)
    serve.run(app, name="GTA1-32B", route_prefix="/")

    # Quick health sample
    try:
        r = requests.get(f"http://0.0.0.0:{args.port}/health", timeout=5)
        print(r.json())
    except Exception as e:
        print("Health probe failed:", e)
python
import argparse
import base64
import concurrent.futures
import json
import os
import re
from typing import Dict, List, Tuple
from gui_agent.agent.gta1.format_message import encode_numpy_image_to_base64, encode_image_bytes, smart_resize

import requests
from PIL import Image, ImageDraw


def image_file_to_data_uri(image_path: str) -> str:
    if not os.path.exists(image_path):
        raise FileNotFoundError(f"Image not found: {image_path}")
    with open(image_path, "rb") as f:
        b64 = base64.b64encode(f.read()).decode("utf-8")
    # default to png; serverside only requires a data URI header then comma
    return f"data:image/png;base64,{b64}"


def build_messages(image_path: str, instruction: str, system_prompt: str) -> List[Dict]:
    return [
        {"role": "system", "content": system_prompt},
        {
            "role": "user",
            "content": [
                {"type": "image", "image": image_file_to_data_uri(image_path)},
                {"type": "text", "text": instruction},
            ],
        },
    ]


def call_health(base_url: str, timeout: float = 10.0) -> Dict:
    r = requests.get(f"{base_url}/health", timeout=timeout)
    r.raise_for_status()
    return r.json()


def call_single(
    base_url: str,
    image_path: str,
    instruction: str,
    system_prompt: str,
    max_new_tokens: int = 512,
    temperature: float = 0.0,
    top_p: float = 0.9,
    timeout: float = 120.0,
) -> List[Dict]:
    payload = {
        "messages": build_messages(image_path, instruction, system_prompt),
        "max_new_tokens": max_new_tokens,
        "temperature": temperature,
        "top_p": top_p,
    }
    r = requests.post(f"{base_url}/call_llm", json=payload, timeout=timeout)
    r.raise_for_status()
    resp = r.json()
    if isinstance(resp, dict):
        return [resp]
    return resp


def call_many_concurrent(
    base_url: str,
    image_path: str,
    instruction: str,
    system_prompt: str,
    num_requests: int,
    concurrency: int,
    max_new_tokens: int = 512,
    temperature: float = 0.0,
    top_p: float = 0.9,
    timeout: float = 120.0,
) -> List[List[Dict]]:
    results: List[List[Dict]] = []

    def _one(i: int) -> List[Dict]:
        # Vary instruction slightly so you can trace requests
        instr = f"{instruction} [req {i+1}/{num_requests}]"
        return call_single(
            base_url,
            image_path,
            instr,
            system_prompt,
            max_new_tokens,
            temperature,
            top_p,
            timeout,
        )

    with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as pool:
        futures = [pool.submit(_one, i) for i in range(num_requests)]
        for fut in concurrent.futures.as_completed(futures):
            results.append(fut.result())
    return results


def pretty_print_response(batch_results: List[Dict]) -> None:
    if isinstance(batch_results, dict):
        batch_results = [batch_results]
    for idx, item in enumerate(batch_results):
        if item.get("error"):
            print(f"[#{idx}] ERROR: {json.dumps(item['error'], ensure_ascii=False)})")
        else:
            usage = item.get("usage", {})
            print(f"[#{idx}] gpu={item.get('gpu_id')} tokens={usage} text=\n{item.get('response','').strip()}\n")

CLICK_KWARGS_REGEX = re.compile(r"pyautogui\.click\(\s*x\s*=\s*(\d+)\s*,\s*y\s*=\s*(\d+)\s*\)")
CLICK_POSARGS_REGEX = re.compile(r"pyautogui\.click\(\s*(\d+)\s*,\s*(\d+)\s*\)")

def extract_clicks_from_text(text: str) -> List[Tuple[int, int]]:
    clicks: List[Tuple[int, int]] = []
    for x, y in CLICK_KWARGS_REGEX.findall(text or ""):
        clicks.append((int(x), int(y)))
    for x, y in CLICK_POSARGS_REGEX.findall(text or ""):
        clicks.append((int(x), int(y)))
    return clicks

def extract_clicks_from_results(result_items: List[Dict]) -> List[Tuple[int, int]]:
    clicks: List[Tuple[int, int]] = []
    if isinstance(result_items, dict):
        result_items = [result_items]
    for item in result_items:
        if item.get("error"):
            continue
        clicks.extend(extract_clicks_from_text(item.get("response", "")))
    return clicks

def compute_resized_dims_for_server_mapping(image_path: str) -> Tuple[int, int, int, int]:
    with Image.open(image_path) as im:
        width, height = im.size
    resized_H, resized_W = smart_resize(
        height,
        width,
        factor=28,
        min_pixels=1000,
        max_pixels=1000000000000,
    )
    return width, height, int(resized_W), int(resized_H)

def map_clicks_to_original(clicks_resized: List[Tuple[int, int]],
                           original_w: int,
                           original_h: int,
                           resized_w: int,
                           resized_h: int) -> List[Tuple[int, int]]:
    if resized_w == 0 or resized_h == 0:
        return []
    scale_x = original_w / float(resized_w)
    scale_y = original_h / float(resized_h)
    mapped: List[Tuple[int, int]] = []
    for x, y in clicks_resized:
        mapped_x = int(round(x * scale_x))
        mapped_y = int(round(y * scale_y))
        mapped.append((mapped_x, mapped_y))
    return mapped

def draw_circles_on_image(image_path: str,
                          points: List[Tuple[int, int]],
                          output_path: str,
                          radius: int = 8,
                          color: Tuple[int, int, int] = (255, 0, 0),
                          width: int = 3) -> None:
    if not points:
        return
    with Image.open(image_path).convert("RGB") as img:
        drawer = ImageDraw.Draw(img)
        for (x, y) in points:
            left = x - radius
            top = y - radius
            right = x + radius
            bottom = y + radius
            drawer.ellipse([(left, top), (right, bottom)], outline=color, fill=(0,255,0), width=width)
        img.save(output_path)
    print(f"Annotated image saved to: {output_path} (points drawn: {len(points)})")

SYSTEM_PROMPT = (
    "You are a GUI agent. You are given a task and a screenshot of the screen. "
    "You need to perform a series of pyautogui actions to complete the task."
)
def main():
    parser = argparse.ArgumentParser(description="Examples: single and batched inference against GTA1-32B Ray Serve.")
    parser.add_argument("--host", type=str, default="http://localhost", help="Ray Serve host, e.g. http://localhost or http://IP")
    parser.add_argument("--port", type=int, default=3005, help="Ray Serve port")
    parser.add_argument("--image", type=str, required=False, default="example.jpg", help="Path to input image")
    parser.add_argument("--instruction", type=str, default="click the icon in the bottom row, third from the left", help="User instruction")
    parser.add_argument("--system", type=str, default=SYSTEM_PROMPT)
    parser.add_argument("--mode", type=str, choices=["single", "batch", "health"], default="batch")
    parser.add_argument("--num_requests", type=int, default=8, help="Number of requests in batch mode")
    parser.add_argument("--concurrency", type=int, default=8, help="Max concurrent HTTP calls in batch mode")
    parser.add_argument("--max_new_tokens", type=int, default=512)
    parser.add_argument("--temperature", type=float, default=0.0)
    parser.add_argument("--top_p", type=float, default=0.9)
    parser.add_argument("--timeout", type=float, default=180.0)
    args = parser.parse_args()

    base_url = f"{args.host}:{args.port}"

    if args.mode == "health":
        info = call_health(base_url, timeout=10.0)
        print(json.dumps(info, indent=2))
        return

    if args.mode == "single":
        result_list = call_single(
            base_url=base_url,
            image_path=args.image,
            instruction=args.instruction,
            system_prompt=args.system,
            max_new_tokens=args.max_new_tokens,
            temperature=args.temperature,
            top_p=args.top_p,
            timeout=args.timeout,
        )
        print(result_list)
        pretty_print_response(result_list)
        clicks_resized = extract_clicks_from_results(result_list)
        if clicks_resized:
            orig_w, orig_h, resized_w, resized_h = compute_resized_dims_for_server_mapping(args.image)
            mapped_clicks = map_clicks_to_original(clicks_resized, orig_w, orig_h, resized_w, resized_h)
            out_path = f"ray_serve/annotated.png"
            draw_circles_on_image(args.image, mapped_clicks, out_path)
        return

    if args.mode == "batch":
        print(f"Submitting {args.num_requests} requests with concurrency={args.concurrency}...")
        batch_outs = call_many_concurrent(
            base_url=base_url,
            image_path=args.image,
            instruction=args.instruction,
            system_prompt=args.system,
            num_requests=args.num_requests,
            concurrency=args.concurrency,
            max_new_tokens=args.max_new_tokens,
            temperature=args.temperature,
            top_p=args.top_p,
            timeout=args.timeout,
        )
        for i, one_result in enumerate(batch_outs):
            print(f"===== Result for request {i+1} =====")
            pretty_print_response(one_result)
        all_clicks_resized: List[Tuple[int, int]] = []
        for one_result in batch_outs:
            all_clicks_resized.extend(extract_clicks_from_results(one_result))
        if all_clicks_resized:
            orig_w, orig_h, resized_w, resized_h = compute_resized_dims_for_server_mapping(args.image)
            mapped_clicks = map_clicks_to_original(all_clicks_resized, orig_w, orig_h, resized_w, resized_h)
            out_path = f"ray_serve/annotated.png"
            draw_circles_on_image(args.image, mapped_clicks, out_path)
        return


if __name__ == "__main__":
    main()

Deploy This Model

Production-ready deployment in minutes

Together.ai

Instant API access to this model

Fastest API

Production-ready inference API. Start free, scale to millions.

Try Free API

Replicate

One-click model deployment

Easiest Setup

Run models in the cloud with simple API. No DevOps required.

Deploy Now

Disclosure: We may earn a commission from these partners. This helps keep LLMYourWay free.