GTA1-32B
1.4K
5
32.0B
1 language
license:mit
by
Salesforce
Image Model
OTHER
32B params
New
1K downloads
Early-stage
Edge AI:
Mobile
Laptop
Server
72GB+ RAM
Mobile
Laptop
Server
Quick Summary
Reinforcement learning (RL) (e.
Device Compatibility
Mobile
4-6GB RAM
Laptop
16GB RAM
Server
GPU
Minimum Recommended
30GB+ RAM
Code Examples
Inferencepythontransformers
from transformers import AutoTokenizer, AutoImageProcessor
from transformers.models.qwen2_vl.image_processing_qwen2_vl_fast import smart_resize
from PIL import Image
from io import BytesIO
import base64
import re
from vllm import LLM, SamplingParams
instruction="click start"
image_path="example.png"
CLICK_REGEXES = [
# pyautogui.click(x=123, y=456)
re.compile(r"click\s*\(\s*x\s*=\s*(\d+)\s*,\s*y\s*=\s*(\d+)\s*\)", re.IGNORECASE),
# pyautogui.click(123, 456) or click(123,456)
re.compile(r"click\s*\(\s*(\d+)\s*,\s*(\d+)\s*\)", re.IGNORECASE),
]
def format_message(image_path,instruction):
SYSTEM_PROMPT = (
"You are a GUI agent. You are given a task and a screenshot of the screen. "
"You need to perform a series of pyautogui actions to complete the task."
)
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": [
{"type": "image", "image": image_path},
{"type": "text", "text": instruction},
]},
]
text = prompt_tok.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
text2, n = re.subn(
r"<\|media_begin\|>.*?<\|media_end\|>",
"<|vision_start|><|image_pad|><|vision_end|>",
text,
flags=re.S
)
if n == 0:
raise RuntimeError("Cannot find <|media_begin|>...<|media_end|> token.")
return text2
def parse_xy_from_text(text: str):
if "click" not in text.lower():
return [-1, -1]
for rx in CLICK_REGEXES:
m = rx.search(text)
if m:
try:
return int(m.group(1)), int(m.group(2))
except Exception:
continue
return [-1,-1]
def convert_pil_image_to_base64(image):
buffered = BytesIO()
image.save(buffered, format="PNG")
return base64.b64encode(buffered.getvalue()).decode()
llm = LLM(
model="Salesforce/GTA1-32B",
tokenizer="Salesforce/GTA1-32B",
tokenizer_mode="slow",
trust_remote_code=True,
dtype="bfloat16",
limit_mm_per_prompt={"image": 1},
tensor_parallel_size=1,
)
prompt_tok = AutoTokenizer.from_pretrained("Salesforce/GTA1-32B", trust_remote_code=True)
sp = SamplingParams(max_tokens=512, temperature=0.0)
tokenizer = llm.get_tokenizer()
processor=AutoImageProcessor.from_pretrained("Salesforce/GTA1-32B", trust_remote_code=True)
image = Image.open(image_path).convert('RGB')
resized_height, resized_width = smart_resize(
image.height,
image.width,
factor=processor.patch_size * processor.merge_size,
min_pixels=processor.min_pixels,
max_pixels=processor.max_pixels,
)
resized_image = image.resize((resized_width, resized_height))
messages = format_message(image_path, instruction)
response = llm.generate(
[{"prompt": messages, "multi_modal_data": {"image": [resized_image]}}],
sampling_params=sp
)[0].outputs[0].text
coordinates = parse_xy_from_text(response)
print(coordinates[0]/resized_width*image.width, coordinates[1]/resized_height*image.height)Model Servingpythontransformers
import torch
import os
# -------------------------
# System / Torch defaults
# -------------------------
os.environ.setdefault("TOKENIZERS_PARALLELISM", "false") # avoid CPU oversubscription
os.environ.setdefault("VLLM_USE_V1", "1")
os.environ.setdefault("VLLM_ENGINE_IN_BACKGROUND_THREAD", "0")
import base64
import re
from typing import Dict, List, Union
from PIL import Image
from io import BytesIO
import traceback
import argparse
import asyncio
import requests
import ray
from ray import serve
from fastapi import FastAPI
from transformers import AutoTokenizer
from vllm import LLM, SamplingParams
import uuid
N_REPLICAS = 2
try:
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.benchmark = True
except Exception:
pass
# -------------------------
# IO helpers
# -------------------------
def pil_to_base64(img: Image.Image, format: str = "PNG") -> str:
buffer = BytesIO()
img.save(buffer, format=format)
img_bytes = buffer.getvalue()
img_b64 = base64.b64encode(img_bytes).decode("utf-8")
return img_b64
def data_uri_to_pil(data_uri: str) -> Image.Image:
header, b64_str = data_uri.split(",", 1)
img_data = base64.b64decode(b64_str)
buffer = BytesIO(img_data)
img = Image.open(buffer)
return img
def extract_images(messages: List[Dict]) -> List[Image.Image]:
images = []
for msg in messages:
if msg.get("role") == "user":
for content in msg.get("content", []):
if content.get("type") in ["image", "image_url"]:
if content["type"] == "image":
images.append(data_uri_to_pil(content["image"]).convert("RGB"))
else:
images.append(data_uri_to_pil(content["image_url"]["url"]).convert("RGB"))
return images
# -------------------------
# Prompt builder
# -------------------------
def build_prompt_with_template(tokenizer: AutoTokenizer, messages: List[Dict]) -> str:
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
text2, n = re.subn(
r"<\|media_begin\|>.*?<\|media_end\|>",
"<|vision_start|><|image_pad|><|vision_end|>",
text,
flags=re.S,
)
if n == 0:
raise RuntimeError("Did not find <|media_begin|>...<|media_end|> block in template.")
return text2
# -------------------------
# Deployment
# -------------------------
def build_app(model_path: str, num_replicas: int, port: int):
api = FastAPI(title="GTA1-32B Multi-GPU Service (High-throughput)")
@serve.deployment(
num_replicas=num_replicas,
ray_actor_options={"num_gpus": 1, "num_cpus": 4},
max_ongoing_requests=16,
)
class GTA1Model:
def __init__(self, model_path: str):
gpu_ids = ray.get_gpu_ids()
self.gpu_id = gpu_ids[0] if gpu_ids else 0
print(f"🔍 Ray assigned GPU IDs: {gpu_ids}")
# Initialize vLLM within this replica (Ray sets CUDA_VISIBLE_DEVICES)
print(f"🔄 Initializing vLLM on GPU {self.gpu_id}[ray id] from {model_path}")
if not torch.cuda.is_available():
raise RuntimeError("CUDA is not available")
self.llm = LLM(
model=model_path,
tokenizer=model_path,
tokenizer_mode="slow",
trust_remote_code=True,
dtype="bfloat16",
limit_mm_per_prompt={"image": 1},
max_model_len=32768,
tensor_parallel_size=1,
)
self.vllm_tokenizer = self.llm.get_tokenizer()
self.hf_tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
self.model_path = model_path
self.dtype = "bfloat16"
print(f"✅ vLLM initialized successfully (Ray GPU Id: {self.gpu_id})")
# ------------ batching core ------------
@serve.batch(max_batch_size=8, batch_wait_timeout_s=0.1) # increase if GPU allows
async def _generate_batch(self, payload: Union[Dict, List[Dict]]):
"""Build prompts, enforce single image, and call vLLM.generate."""
if isinstance(payload, dict):
list_of_payloads = [payload]
else:
list_of_payloads = payload
request_id = uuid.uuid4().hex[:8]
# --- Build per-sample prompt/image ---
prompts: List[str] = []
images_per_req: List[Image.Image] = []
error_results = []
early_exit = False
for p in list_of_payloads:
try:
messages = p["messages"]
imgs = extract_images(messages)
if len(imgs) != 1:
raise RuntimeError(f"Exactly one image is required, got {len(imgs)}")
prompt_text = build_prompt_with_template(self.hf_tokenizer, messages)
# Sanity check on tokens: 1 <|image_pad|>, no <|media_placeholder|>
tok = self.vllm_tokenizer
id_imgpad = tok.encode("<|image_pad|>", add_special_tokens=False)[0]
id_media = tok.encode("<|media_placeholder|>", add_special_tokens=False)[0]
ids = tok.encode(prompt_text, add_special_tokens=False)
if sum(i == id_imgpad for i in ids) != 1 or any(i == id_media for i in ids):
raise RuntimeError("Prompt media tokens invalid after conversion")
prompts.append(prompt_text)
images_per_req.append(imgs[0])
except Exception as e:
early_exit = True
trace = traceback.format_exc()
error_results.append(
{
"response": "",
"error": {
"message": str(e),
"trace": trace,
'type_of_payload': str(type(payload)),
'type_of_list_of_payloads': str(type(list_of_payloads)),
'type_of_p': str(type(p)),
'p_keys': str(p.keys()) if isinstance(p, dict) else str(p),
},
"usage": {},
"gpu_id": self.gpu_id
}
)
if early_exit:
return error_results
# --- vLLM generation ---
args_base = list_of_payloads[0]
sp = SamplingParams(
max_tokens=args_base.get("max_new_tokens", 512),
temperature=args_base.get("temperature", 0.0),
top_p=args_base.get("top_p", 0.9),
)
requests_list = [
{"prompt": pr, "multi_modal_data": {"image": [im]}}
for pr, im in zip(prompts, images_per_req)
]
outs = self.llm.generate(requests_list, sampling_params=sp)
tok = self.vllm_tokenizer
results: List[Dict] = []
for pr, o in zip(prompts, outs):
text = o.outputs[0].text if o.outputs else ""
gen_tokens = len(o.outputs[0].token_ids) if (o.outputs and hasattr(o.outputs[0], 'token_ids')) else None
prompt_tokens = len(tok.encode(pr, add_special_tokens=False))
usage = {
"prompt_tokens": prompt_tokens,
"generated_tokens": gen_tokens if gen_tokens is not None else None,
"total_tokens": (prompt_tokens + gen_tokens) if gen_tokens is not None else None,
}
results.append({
"response": text,
"error": "",
"usage": usage,
"gpu_id": self.gpu_id,
'bs_size_in_this_request': f"{request_id}:{len(list_of_payloads)}"
})
return results
# Exposed single-call entry that joins the batch
async def call_llm(self, payload: Dict):
try:
res = await self._generate_batch(payload)
return res
except Exception as e:
trace = traceback.format_exc()
return {"response": "", "error": {"message": str(e), "trace": trace}, "usage": {}, "gpu_id": self.gpu_id}
def health(self):
return {
"status": "ok",
"gpu_id": self.gpu_id,
"dtype": self.dtype,
"model_path": self.model_path,
}
model = GTA1Model.bind(model_path)
@serve.deployment(max_ongoing_requests=96)
@serve.ingress(api)
class GTA1App:
def __init__(self, model_handle):
self.model_deployment = model_handle
@api.get("/health")
async def health_all(self):
# Calling the same Serve handle N times does not guarantee each call hits a different replica
attempts = max(8, N_REPLICAS * 4) # oversample
calls = [self.model_deployment.health.remote() for i in range(attempts)]
replies = await asyncio.gather(*calls)
# dedupe by replica_id (or by tuple(gpu_id))
seen = {}
for r in replies:
seen[r.get("gpu_id", f"unknown-{len(seen)}")] = r
if len(seen) >= N_REPLICAS:
break
return {"replicas": list(seen.values())}
@api.post("/call_llm")
async def call_llm(self, req: Dict):
return await self.model_deployment.call_llm.remote(req)
return GTA1App.bind(model)
# -------------------------
# Main
# -------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--model_path", type=str, default="Salesforce/GTA1-32B")
parser.add_argument("--host", type=str, default="0.0.0.0")
parser.add_argument("--port", type=int, default=3005)
parser.add_argument("--num_replicas", type=int, default=2)
args = parser.parse_args()
N_REPLICAS = args.num_replicas
ray.init(ignore_reinit_error=True)
print(f"🚀 Starting GTA1-32B service on {args.host}:{args.port}")
serve.start(detached=True, http_options={"host": args.host, "port": args.port})
app = build_app(args.model_path, args.num_replicas, args.port)
serve.run(app, name="GTA1-32B", route_prefix="/")
# Quick health sample
try:
r = requests.get(f"http://0.0.0.0:{args.port}/health", timeout=5)
print(r.json())
except Exception as e:
print("Health probe failed:", e)python
import argparse
import base64
import concurrent.futures
import json
import os
import re
from typing import Dict, List, Tuple
from gui_agent.agent.gta1.format_message import encode_numpy_image_to_base64, encode_image_bytes, smart_resize
import requests
from PIL import Image, ImageDraw
def image_file_to_data_uri(image_path: str) -> str:
if not os.path.exists(image_path):
raise FileNotFoundError(f"Image not found: {image_path}")
with open(image_path, "rb") as f:
b64 = base64.b64encode(f.read()).decode("utf-8")
# default to png; serverside only requires a data URI header then comma
return f"data:image/png;base64,{b64}"
def build_messages(image_path: str, instruction: str, system_prompt: str) -> List[Dict]:
return [
{"role": "system", "content": system_prompt},
{
"role": "user",
"content": [
{"type": "image", "image": image_file_to_data_uri(image_path)},
{"type": "text", "text": instruction},
],
},
]
def call_health(base_url: str, timeout: float = 10.0) -> Dict:
r = requests.get(f"{base_url}/health", timeout=timeout)
r.raise_for_status()
return r.json()
def call_single(
base_url: str,
image_path: str,
instruction: str,
system_prompt: str,
max_new_tokens: int = 512,
temperature: float = 0.0,
top_p: float = 0.9,
timeout: float = 120.0,
) -> List[Dict]:
payload = {
"messages": build_messages(image_path, instruction, system_prompt),
"max_new_tokens": max_new_tokens,
"temperature": temperature,
"top_p": top_p,
}
r = requests.post(f"{base_url}/call_llm", json=payload, timeout=timeout)
r.raise_for_status()
resp = r.json()
if isinstance(resp, dict):
return [resp]
return resp
def call_many_concurrent(
base_url: str,
image_path: str,
instruction: str,
system_prompt: str,
num_requests: int,
concurrency: int,
max_new_tokens: int = 512,
temperature: float = 0.0,
top_p: float = 0.9,
timeout: float = 120.0,
) -> List[List[Dict]]:
results: List[List[Dict]] = []
def _one(i: int) -> List[Dict]:
# Vary instruction slightly so you can trace requests
instr = f"{instruction} [req {i+1}/{num_requests}]"
return call_single(
base_url,
image_path,
instr,
system_prompt,
max_new_tokens,
temperature,
top_p,
timeout,
)
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as pool:
futures = [pool.submit(_one, i) for i in range(num_requests)]
for fut in concurrent.futures.as_completed(futures):
results.append(fut.result())
return results
def pretty_print_response(batch_results: List[Dict]) -> None:
if isinstance(batch_results, dict):
batch_results = [batch_results]
for idx, item in enumerate(batch_results):
if item.get("error"):
print(f"[#{idx}] ERROR: {json.dumps(item['error'], ensure_ascii=False)})")
else:
usage = item.get("usage", {})
print(f"[#{idx}] gpu={item.get('gpu_id')} tokens={usage} text=\n{item.get('response','').strip()}\n")
CLICK_KWARGS_REGEX = re.compile(r"pyautogui\.click\(\s*x\s*=\s*(\d+)\s*,\s*y\s*=\s*(\d+)\s*\)")
CLICK_POSARGS_REGEX = re.compile(r"pyautogui\.click\(\s*(\d+)\s*,\s*(\d+)\s*\)")
def extract_clicks_from_text(text: str) -> List[Tuple[int, int]]:
clicks: List[Tuple[int, int]] = []
for x, y in CLICK_KWARGS_REGEX.findall(text or ""):
clicks.append((int(x), int(y)))
for x, y in CLICK_POSARGS_REGEX.findall(text or ""):
clicks.append((int(x), int(y)))
return clicks
def extract_clicks_from_results(result_items: List[Dict]) -> List[Tuple[int, int]]:
clicks: List[Tuple[int, int]] = []
if isinstance(result_items, dict):
result_items = [result_items]
for item in result_items:
if item.get("error"):
continue
clicks.extend(extract_clicks_from_text(item.get("response", "")))
return clicks
def compute_resized_dims_for_server_mapping(image_path: str) -> Tuple[int, int, int, int]:
with Image.open(image_path) as im:
width, height = im.size
resized_H, resized_W = smart_resize(
height,
width,
factor=28,
min_pixels=1000,
max_pixels=1000000000000,
)
return width, height, int(resized_W), int(resized_H)
def map_clicks_to_original(clicks_resized: List[Tuple[int, int]],
original_w: int,
original_h: int,
resized_w: int,
resized_h: int) -> List[Tuple[int, int]]:
if resized_w == 0 or resized_h == 0:
return []
scale_x = original_w / float(resized_w)
scale_y = original_h / float(resized_h)
mapped: List[Tuple[int, int]] = []
for x, y in clicks_resized:
mapped_x = int(round(x * scale_x))
mapped_y = int(round(y * scale_y))
mapped.append((mapped_x, mapped_y))
return mapped
def draw_circles_on_image(image_path: str,
points: List[Tuple[int, int]],
output_path: str,
radius: int = 8,
color: Tuple[int, int, int] = (255, 0, 0),
width: int = 3) -> None:
if not points:
return
with Image.open(image_path).convert("RGB") as img:
drawer = ImageDraw.Draw(img)
for (x, y) in points:
left = x - radius
top = y - radius
right = x + radius
bottom = y + radius
drawer.ellipse([(left, top), (right, bottom)], outline=color, fill=(0,255,0), width=width)
img.save(output_path)
print(f"Annotated image saved to: {output_path} (points drawn: {len(points)})")
SYSTEM_PROMPT = (
"You are a GUI agent. You are given a task and a screenshot of the screen. "
"You need to perform a series of pyautogui actions to complete the task."
)
def main():
parser = argparse.ArgumentParser(description="Examples: single and batched inference against GTA1-32B Ray Serve.")
parser.add_argument("--host", type=str, default="http://localhost", help="Ray Serve host, e.g. http://localhost or http://IP")
parser.add_argument("--port", type=int, default=3005, help="Ray Serve port")
parser.add_argument("--image", type=str, required=False, default="example.jpg", help="Path to input image")
parser.add_argument("--instruction", type=str, default="click the icon in the bottom row, third from the left", help="User instruction")
parser.add_argument("--system", type=str, default=SYSTEM_PROMPT)
parser.add_argument("--mode", type=str, choices=["single", "batch", "health"], default="batch")
parser.add_argument("--num_requests", type=int, default=8, help="Number of requests in batch mode")
parser.add_argument("--concurrency", type=int, default=8, help="Max concurrent HTTP calls in batch mode")
parser.add_argument("--max_new_tokens", type=int, default=512)
parser.add_argument("--temperature", type=float, default=0.0)
parser.add_argument("--top_p", type=float, default=0.9)
parser.add_argument("--timeout", type=float, default=180.0)
args = parser.parse_args()
base_url = f"{args.host}:{args.port}"
if args.mode == "health":
info = call_health(base_url, timeout=10.0)
print(json.dumps(info, indent=2))
return
if args.mode == "single":
result_list = call_single(
base_url=base_url,
image_path=args.image,
instruction=args.instruction,
system_prompt=args.system,
max_new_tokens=args.max_new_tokens,
temperature=args.temperature,
top_p=args.top_p,
timeout=args.timeout,
)
print(result_list)
pretty_print_response(result_list)
clicks_resized = extract_clicks_from_results(result_list)
if clicks_resized:
orig_w, orig_h, resized_w, resized_h = compute_resized_dims_for_server_mapping(args.image)
mapped_clicks = map_clicks_to_original(clicks_resized, orig_w, orig_h, resized_w, resized_h)
out_path = f"ray_serve/annotated.png"
draw_circles_on_image(args.image, mapped_clicks, out_path)
return
if args.mode == "batch":
print(f"Submitting {args.num_requests} requests with concurrency={args.concurrency}...")
batch_outs = call_many_concurrent(
base_url=base_url,
image_path=args.image,
instruction=args.instruction,
system_prompt=args.system,
num_requests=args.num_requests,
concurrency=args.concurrency,
max_new_tokens=args.max_new_tokens,
temperature=args.temperature,
top_p=args.top_p,
timeout=args.timeout,
)
for i, one_result in enumerate(batch_outs):
print(f"===== Result for request {i+1} =====")
pretty_print_response(one_result)
all_clicks_resized: List[Tuple[int, int]] = []
for one_result in batch_outs:
all_clicks_resized.extend(extract_clicks_from_results(one_result))
if all_clicks_resized:
orig_w, orig_h, resized_w, resized_h = compute_resized_dims_for_server_mapping(args.image)
mapped_clicks = map_clicks_to_original(all_clicks_resized, orig_w, orig_h, resized_w, resized_h)
out_path = f"ray_serve/annotated.png"
draw_circles_on_image(args.image, mapped_clicks, out_path)
return
if __name__ == "__main__":
main()Deploy This Model
Production-ready deployment in minutes
Together.ai
Instant API access to this model
Production-ready inference API. Start free, scale to millions.
Try Free APIReplicate
One-click model deployment
Run models in the cloud with simple API. No DevOps required.
Deploy NowDisclosure: We may earn a commission from these partners. This helps keep LLMYourWay free.