Python Endpoints

This guide explains how to use the Tensor One Python SDK to run, monitor, and manage serverless endpoints using synchronous, asynchronous, and streaming methods.

Prerequisites

Before using the SDK:
  • Install the tensorone Python SDK
  • Set your API key
  • Initialize the endpoint

Setup Example

import tensorone
import os

tensorone.api_key = os.getenv("TENSORONE_API_KEY")
endpoint = tensorone.Endpoint("YOUR_ENDPOINT_ID")

Run Synchronously

Use run_sync() to execute the endpoint and block until completion or timeout:
try:
    run_request = endpoint.run_sync({
        "input": { "prompt": "Hello, world!" }
    }, timeout=60)
    print(run_request)
except TimeoutError:
    print("Job timed out.")

Run Asynchronously (Standard Python)

input_payload = {"input": {"prompt": "Hello, World!"}}

try:
    run_request = endpoint.run(input_payload)
    status = run_request.status()
    print(f"Initial job status: {status}")

    if status != "COMPLETED":
        output = run_request.output(timeout=60)
    else:
        output = run_request.output()

    print(f"Job output: {output}")
except Exception as e:
    print(f"An error occurred: {e}")

Run Asynchronously with asyncio

import asyncio
import aiohttp
from tensorone import AsyncioEndpoint, AsyncioJob

async def main():
    async with aiohttp.ClientSession() as session:
        input_payload = {"prompt": "Hello, World!"}
        endpoint = AsyncioEndpoint("YOUR_ENDPOINT_ID", session)
        job: AsyncioJob = await endpoint.run(input_payload)

        while True:
            status = await job.status()
            print(f"Current job status: {status}")

            if status == "COMPLETED":
                output = await job.output()
                print("Job output:", output)
                break
            elif status == "FAILED":
                print("Job failed.")
                break
            else:
                await asyncio.sleep(3)

if __name__ == "__main__":
    asyncio.run(main())

Health Check

Inspect worker and job status:
import json

endpoint_health = endpoint.health()
print(json.dumps(endpoint_health, indent=2))

Streaming Output

Ensure your handler sets return_aggregate_stream: True:
from time import sleep

def handler(job):
    job_input = job["input"]["prompt"]
    for i in job_input:
        sleep(1)
        yield i

tensorone.serverless.start({
    "handler": handler,
    "return_aggregate_stream": True,
})

Check Job Status

run_request = endpoint.run({"input": {"prompt": "Hello, World!"}})
status = run_request.status()
print(f"Job status: {status}")

Cancel a Job

import time

input_payload = {
    "messages": [{"role": "user", "content": "Hello, World"}],
    "max_tokens": 2048,
    "use_openai_format": True,
}

try:
    run_request = endpoint.run(input_payload)
    while True:
        status = run_request.status()
        print(f"Current job status: {status}")

        if status == "COMPLETED":
            output = run_request.output()
            print("Job output:", output)
            break
        elif status in ["FAILED", "ERROR"]:
            print("Job failed.")
            break
        else:
            time.sleep(10)

except KeyboardInterrupt:
    print("Canceling the job...")
    run_request.cancel()
    print("Job canceled.")

Timeout Handling

Cancel jobs if they exceed a specified timeout:
from time import sleep

input_payload = {"input": {"prompt": "Hello, World!"}}
run_request = endpoint.run(input_payload)
print(f"Initial status: {run_request.status()}")

run_request.cancel(timeout=3)
sleep(3)

print(f"Final status: {run_request.status()}")

Purge Queue

Remove pending jobs from the queue (in-progress jobs are unaffected):
endpoint.purge_queue(timeout=3)