SDKs
Python Endpoints
Interacting with Tensor One Endpoints.
This documentation provides detailed instructions on how to use the Tensor One Python SDK to interact with various endpoints. You can perform synchronous and asynchronous operations, stream data, and check the health status of endpoints.
Prerequisites
Before using the Tensor One SDK:
- Install the Tensor One Python SDK.
- Configure your API key.
- Set your Endpoint ID and initialize the Endpoint class.
import tensorone
import os
tensorone.api_key = os.getenv("TENSORONE_API_KEY")
endpoint = tensorone.Endpoint("YOUR_ENDPOINT_ID")
Run Synchronously
Use run_sync()
to execute and block until completion or timeout.
try:
run_request = endpoint.run_sync({
"input": {
"prompt": "Hello, world!",
}
}, timeout=60)
print(run_request)
except TimeoutError:
print("Job timed out.")
Run Asynchronously (Standard Python)
input_payload = {"input": {"prompt": "Hello, World!"}}
try:
run_request = endpoint.run(input_payload)
status = run_request.status()
print(f"Initial job status: {status}")
if status != "COMPLETED":
output = run_request.output(timeout=60)
else:
output = run_request.output()
print(f"Job output: {output}")
except Exception as e:
print(f"An error occurred: {e}")
Run Asynchronously (with asyncio
)
import asyncio
import aiohttp
from tensorone import AsyncioEndpoint, AsyncioJob
async def main():
async with aiohttp.ClientSession() as session:
input_payload = {"prompt": "Hello, World!"}
endpoint = AsyncioEndpoint("YOUR_ENDPOINT_ID", session)
job: AsyncioJob = await endpoint.run(input_payload)
while True:
status = await job.status()
print(f"Current job status: {status}")
if status == "COMPLETED":
output = await job.output()
print("Job output:", output)
break
elif status == "FAILED":
print("Job failed.")
break
else:
await asyncio.sleep(3)
if __name__ == "__main__":
asyncio.run(main())
Health Check
Check job and worker status.
import json
endpoint_health = endpoint.health()
print(json.dumps(endpoint_health, indent=2))
Streaming Output
Ensure handler supports streaming with return_aggregate_stream: True
.
from time import sleep
def handler(job):
job_input = job["input"]["prompt"]
for i in job_input:
sleep(1)
yield i
tensorone.serverless.start({
"handler": handler,
"return_aggregate_stream": True,
})
Status
Check the status of a job.
run_request = endpoint.run({"input": {"prompt": "Hello, World!"}})
status = run_request.status()
print(f"Job status: {status}")
Cancel
Cancel a running job, e.g., on user interrupt.
import time
input_payload = {
"messages": [{"role": "user", "content": "Hello, World"}],
"max_tokens": 2048,
"use_openai_format": True,
}
try:
run_request = endpoint.run(input_payload)
while True:
status = run_request.status()
print(f"Current job status: {status}")
if status == "COMPLETED":
output = run_request.output()
print("Job output:", output)
break
elif status in ["FAILED", "ERROR"]:
print("Job failed.")
break
else:
time.sleep(10)
except KeyboardInterrupt:
print("Canceling the job...")
run_request.cancel()
print("Job canceled.")
Timeout Handling
Cancel jobs exceeding a timeout.
from time import sleep
input_payload = {"input": {"prompt": "Hello, World!"}}
run_request = endpoint.run(input_payload)
print(f"Initial status: {run_request.status()}")
run_request.cancel(timeout=3)
sleep(3)
print(f"Final status: {run_request.status()}")
Purge Queue
Clear pending jobs from the queue (does not affect in-progress jobs).
endpoint.purge_queue(timeout=3)