Skip to main content
A loop that fires requests in sequence stalls the moment your workload grows past a few hundred calls. Valar gives you two ways to clear large volumes: fan out many concurrent Responses API calls in background mode, or hand a single bundle to the Batch API and let Valar work through it. This page covers both, starting with how to choose.

Which path fits your workload

Concurrent Responses API

You want results streaming back as each request finishes, need per-request control, or care about latency on individual items. You manage concurrency on your client.

Batch API

You have a fixed set of requests and would rather submit once and collect results later. Valar owns the queue; you poll for status.
The running example throughout is sentiment scoring for a backlog of product reviews, using zai-org/GLM-5.1-FP8.

Fan out with the Responses API

Submit each request individually with background=True and run many in flight at once. For workloads of roughly 1,000 requests and up, four practices keep it stable:
  • Use AsyncOpenAI with DefaultAioHttpClient(). Under high concurrency the SDK’s aiohttp backend outperforms the default httpx transport.
  • Bound concurrency with an asyncio.Semaphore. A fixed limit — 200 is a sound starting point — caps simultaneous connections so you never exhaust them.
  • Submit all requests behind the semaphore, collect the response IDs, then poll in a second pass. Separating submission from polling keeps both phases simple.
  • Attach an Idempotency-Key per request so a retry after a transient failure replays the reservation rather than paying for inference twice. See Idempotent Requests.
Install the dependencies first:
pip install tqdm 'openai[aiohttp]'
The script submits every review concurrently, gathers the response IDs, and then polls each one to completion before printing its score:
import argparse
import asyncio

from tqdm import tqdm
from openai import AsyncOpenAI, DefaultAioHttpClient

REVIEW_TEMPLATE = (
    "Classify the sentiment of this product review as positive, negative, or "
    "neutral. Reply with one word.\n\nReview {i}: {text}"
)


async def main(num_requests: int, max_output_tokens: int, model: str):
    async with AsyncOpenAI(
        base_url="https://api.valarhq.ai/v1",
        api_key="YOUR_KEY_HERE",
        http_client=DefaultAioHttpClient(),
    ) as client:
        # Confirm the model is available in this environment.
        models = await client.models.list()
        print("Supported models:", [m.id for m in models.data])

        submit_sem = asyncio.Semaphore(200)
        submit_bar = tqdm(total=num_requests, desc="Submitting")

        async def submit(i):
            content = REVIEW_TEMPLATE.format(
                i=i, text=f"Sample review body number {i} goes here."
            )
            async with submit_sem:
                response = await client.responses.create(
                    model=model,
                    input=[{"role": "user", "content": content}],
                    max_output_tokens=max_output_tokens,
                    background=True,
                )
            submit_bar.update(1)
            return response.id

        response_ids = list(await asyncio.gather(*[submit(i) for i in range(num_requests)]))
        submit_bar.close()
        print(f"Submitted {len(response_ids)} requests")

        # Second pass: poll each background response until it completes.
        completed = {}
        poll_sem = asyncio.Semaphore(200)
        poll_bar = tqdm(total=len(response_ids), desc="Polling")

        async def poll(response_id):
            for _ in range(3600):  # up to ~1 hour
                async with poll_sem:
                    response = await client.responses.retrieve(response_id)
                if response.status == "completed":
                    completed[response_id] = response
                    poll_bar.update(1)
                    return
                await asyncio.sleep(1)
            print(f"Timed out waiting for {response_id}")

        await asyncio.gather(*[poll(rid) for rid in response_ids])
        poll_bar.close()

        for response_id in response_ids:
            resp = completed[response_id]
            text = "".join(
                c.text
                for item in resp.output
                for c in getattr(item, "content", []) or []
                if getattr(c, "type", None) == "output_text"
            )
            print(f"{response_id}: {text.strip()}")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--num-requests", type=int, default=20000)
    parser.add_argument("--max-output-tokens", type=int, default=16)
    parser.add_argument("--model", type=str, default="zai-org/GLM-5.1-FP8")
    args = parser.parse_args()

    asyncio.run(
        main(
            num_requests=args.num_requests,
            max_output_tokens=args.max_output_tokens,
            model=args.model,
        )
    )
Treat a semaphore of 200 as the baseline. Lower it if connection errors or timeouts appear; raise it when you have headroom and want submissions to move through faster.

Hand off a bundle with the Batch API

Coming soon. The Batch API isn’t generally available yet. Until it ships, fan out through the Responses API with background=True for large workloads.
The Batch API takes up to 100,000 requests in one call, with a ceiling of 256 MB per batch. The lifecycle is three operations:
1

Submit

Bundle every request into a single POST /batches.
2

Poll

Call GET /batches/{batch_id} until every request has settled.
3

Retrieve

Pull each result with GET /batches/{batch_id}/{custom_id}.
To review everything you’ve submitted previously, call GET /batches. Include an Idempotency-Key header on submission so a client retry after a dropped connection replays your original reservation instead of creating a duplicate batch. See Idempotent Requests.
Batch items run on the standard window, and that’s the default if you omit the field — the low-latency Now (asap) tier isn’t available for batch. When latency matters, fan out through the Responses API instead. See Inference modes for the realtime-versus-async-versus-batch tradeoff, and Completion windows for per-model support.
This version scores the same review backlog as one batch:
import time
import requests

BASE_URL = "https://api.valarhq.ai/v1"
API_KEY = "YOUR_KEY_HERE"
HEADERS = {"Authorization": f"Bearer {API_KEY}"}

# 1. Submit a batch
batch_requests = [
    {
        "custom_id": f"review-{i}",
        "params": {
            "model": "zai-org/GLM-5.1-FP8",
            "max_output_tokens": 16,
            "input": [
                {
                    "role": "user",
                    "content": (
                        "Classify the sentiment of this review as positive, "
                        f"negative, or neutral. Review {i}: Sample review body {i}."
                    ),
                }
            ],
            "metadata": {"completion_window": "standard"},
        },
    }
    for i in range(100)
]

resp = requests.post(
    f"{BASE_URL}/batches",
    headers=HEADERS,
    json={
        "endpoint": "/v1/responses",
        "label": "review-sentiment",
        "requests": batch_requests,
    },
)
batch = resp.json()
batch_id = batch["id"]
print(f"Created batch {batch_id} with {batch['request_counts']} requests")

# 2. Poll until every request has settled
while True:
    status = requests.get(f"{BASE_URL}/batches/{batch_id}", headers=HEADERS).json()
    request_status = status["request_status"]
    done = sum(
        1 for r in request_status.values()
        if r["status"] in ("COMPLETED", "FAILED", "CANCELLED")
    )
    print(f"Progress: {done}/{len(request_status)}")
    if done == len(request_status):
        break
    time.sleep(5)

# 3. Retrieve results
for custom_id, info in request_status.items():
    if info["status"] != "COMPLETED":
        continue
    result = requests.get(
        f"{BASE_URL}/batches/{batch_id}/{custom_id}", headers=HEADERS
    ).json()
    output_text = "".join(
        c["text"]
        for item in result.get("output", [])
        for c in item.get("content", [])
        if c.get("type") == "output_text"
    )
    print(f"{custom_id}: {output_text.strip()}")