A loop that fires requests in sequence stalls the moment your workload grows past a few hundred calls. Valar gives you two ways to clear large volumes: fan out many concurrent Responses API calls in background mode, or hand a single bundle to the Batch API and let Valar work through it. This page covers both, starting with how to choose.
Which path fits your workload
Concurrent Responses API
You want results streaming back as each request finishes, need per-request control, or care about latency on individual items. You manage concurrency on your client.
Batch API
You have a fixed set of requests and would rather submit once and collect results later. Valar owns the queue; you poll for status.
The running example throughout is sentiment scoring for a backlog of product reviews, using zai-org/GLM-5.1-FP8.
Fan out with the Responses API
Submit each request individually with background=True and run many in flight at once. For workloads of roughly 1,000 requests and up, four practices keep it stable:
- Use
AsyncOpenAI with DefaultAioHttpClient(). Under high concurrency the SDK’s aiohttp backend outperforms the default httpx transport.
- Bound concurrency with an
asyncio.Semaphore. A fixed limit — 200 is a sound starting point — caps simultaneous connections so you never exhaust them.
- Submit all requests behind the semaphore, collect the response IDs, then poll in a second pass. Separating submission from polling keeps both phases simple.
- Attach an
Idempotency-Key per request so a retry after a transient failure replays the reservation rather than paying for inference twice. See Idempotent Requests.
Install the dependencies first:
pip install tqdm 'openai[aiohttp]'
The script submits every review concurrently, gathers the response IDs, and then polls each one to completion before printing its score:
import argparse
import asyncio
from tqdm import tqdm
from openai import AsyncOpenAI, DefaultAioHttpClient
REVIEW_TEMPLATE = (
"Classify the sentiment of this product review as positive, negative, or "
"neutral. Reply with one word.\n\nReview {i}: {text}"
)
async def main(num_requests: int, max_output_tokens: int, model: str):
async with AsyncOpenAI(
base_url="https://api.valarhq.ai/v1",
api_key="YOUR_KEY_HERE",
http_client=DefaultAioHttpClient(),
) as client:
# Confirm the model is available in this environment.
models = await client.models.list()
print("Supported models:", [m.id for m in models.data])
submit_sem = asyncio.Semaphore(200)
submit_bar = tqdm(total=num_requests, desc="Submitting")
async def submit(i):
content = REVIEW_TEMPLATE.format(
i=i, text=f"Sample review body number {i} goes here."
)
async with submit_sem:
response = await client.responses.create(
model=model,
input=[{"role": "user", "content": content}],
max_output_tokens=max_output_tokens,
background=True,
)
submit_bar.update(1)
return response.id
response_ids = list(await asyncio.gather(*[submit(i) for i in range(num_requests)]))
submit_bar.close()
print(f"Submitted {len(response_ids)} requests")
# Second pass: poll each background response until it completes.
completed = {}
poll_sem = asyncio.Semaphore(200)
poll_bar = tqdm(total=len(response_ids), desc="Polling")
async def poll(response_id):
for _ in range(3600): # up to ~1 hour
async with poll_sem:
response = await client.responses.retrieve(response_id)
if response.status == "completed":
completed[response_id] = response
poll_bar.update(1)
return
await asyncio.sleep(1)
print(f"Timed out waiting for {response_id}")
await asyncio.gather(*[poll(rid) for rid in response_ids])
poll_bar.close()
for response_id in response_ids:
resp = completed[response_id]
text = "".join(
c.text
for item in resp.output
for c in getattr(item, "content", []) or []
if getattr(c, "type", None) == "output_text"
)
print(f"{response_id}: {text.strip()}")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--num-requests", type=int, default=20000)
parser.add_argument("--max-output-tokens", type=int, default=16)
parser.add_argument("--model", type=str, default="zai-org/GLM-5.1-FP8")
args = parser.parse_args()
asyncio.run(
main(
num_requests=args.num_requests,
max_output_tokens=args.max_output_tokens,
model=args.model,
)
)
Treat a semaphore of 200 as the baseline. Lower it if connection errors or timeouts appear; raise it when you have headroom and want submissions to move through faster.
Hand off a bundle with the Batch API
Coming soon. The Batch API isn’t generally available yet. Until it ships, fan out through the Responses API with background=True for large workloads.
The Batch API takes up to 100,000 requests in one call, with a ceiling of 256 MB per batch. The lifecycle is three operations:
Submit
Bundle every request into a single POST /batches.
Poll
Call GET /batches/{batch_id} until every request has settled.
Retrieve
Pull each result with GET /batches/{batch_id}/{custom_id}.
To review everything you’ve submitted previously, call GET /batches. Include an Idempotency-Key header on submission so a client retry after a dropped connection replays your original reservation instead of creating a duplicate batch. See Idempotent Requests.
Batch items run on the standard window, and that’s the default if you omit the field — the low-latency Now (asap) tier isn’t available for batch. When latency matters, fan out through the Responses API instead. See Inference modes for the realtime-versus-async-versus-batch tradeoff, and Completion windows for per-model support.
This version scores the same review backlog as one batch:
import time
import requests
BASE_URL = "https://api.valarhq.ai/v1"
API_KEY = "YOUR_KEY_HERE"
HEADERS = {"Authorization": f"Bearer {API_KEY}"}
# 1. Submit a batch
batch_requests = [
{
"custom_id": f"review-{i}",
"params": {
"model": "zai-org/GLM-5.1-FP8",
"max_output_tokens": 16,
"input": [
{
"role": "user",
"content": (
"Classify the sentiment of this review as positive, "
f"negative, or neutral. Review {i}: Sample review body {i}."
),
}
],
"metadata": {"completion_window": "standard"},
},
}
for i in range(100)
]
resp = requests.post(
f"{BASE_URL}/batches",
headers=HEADERS,
json={
"endpoint": "/v1/responses",
"label": "review-sentiment",
"requests": batch_requests,
},
)
batch = resp.json()
batch_id = batch["id"]
print(f"Created batch {batch_id} with {batch['request_counts']} requests")
# 2. Poll until every request has settled
while True:
status = requests.get(f"{BASE_URL}/batches/{batch_id}", headers=HEADERS).json()
request_status = status["request_status"]
done = sum(
1 for r in request_status.values()
if r["status"] in ("COMPLETED", "FAILED", "CANCELLED")
)
print(f"Progress: {done}/{len(request_status)}")
if done == len(request_status):
break
time.sleep(5)
# 3. Retrieve results
for custom_id, info in request_status.items():
if info["status"] != "COMPLETED":
continue
result = requests.get(
f"{BASE_URL}/batches/{batch_id}/{custom_id}", headers=HEADERS
).json()
output_text = "".join(
c["text"]
for item in result.get("output", [])
for c in item.get("content", [])
if c.get("type") == "output_text"
)
print(f"{custom_id}: {output_text.strip()}")