Skip to main content

Starting with PyPI

Overview

The SVECTOR Python SDK is officially available on PyPI, providing comprehensive integration for Python applications with full support for both synchronous and asynchronous operations. This SDK offers intuitive model completions, advanced document processing, and seamless integration with SVECTOR's cutting-edge AI systems.

What you'll learn:

  • Installing and authenticating the SVECTOR Python SDK
  • Using the Conversations API for simplified interactions
  • Advanced Chat Completions with full message control
  • Real-time streaming responses and document processing
  • Async support for high-performance applications
  • Enterprise-grade error handling and best practices
  • Complete production examples and configuration patterns

Installation

Standard Installation

pip install svector-sdk

Development Installation

For contributing or development work:

git clone https://github.com/svector-corporation/svector-python
cd svector-python
pip install -e ".[dev]"

With Optional Dependencies

# Install with development tools
pip install "svector-sdk[dev]"

# Install with test dependencies
pip install "svector-sdk[test]"

Authentication

Set your API key as an environment variable:

export SVECTOR_API_KEY="your-api-key-here"
from svector import SVECTOR

# Automatically reads from SVECTOR_API_KEY environment variable
client = SVECTOR()

Direct API Key

from svector import SVECTOR

client = SVECTOR(api_key="your-api-key-here")

Get Your API Key

  1. Visit the SVECTOR Dashboard
  2. Navigate to API Keys section
  3. Generate a new API key
  4. Copy and securely store your key

Quick Start Examples

Basic Conversation

from svector import SVECTOR

client = SVECTOR()

response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a helpful AI assistant that explains complex topics clearly.",
input="What is artificial intelligence?",
temperature=0.7,
)

print(response.output)
print(f"Request ID: {response.request_id}")
print(f"Token Usage: {response.usage}")

Advanced Configuration

from svector import SVECTOR

client = SVECTOR(
api_key="your-api-key",
base_url="https://spec-chat.tech",
timeout=30,
max_retries=3,
verify_ssl=True,
)

response = client.conversations.create(
model="spec-3-turbo",
instructions="You are an expert software architect with deep knowledge of distributed systems.",
input="Design a microservices architecture for a high-traffic e-commerce platform.",
temperature=0.8,
max_tokens=1000,
)

print(response.output)

Core Features

The Conversations API provides a user-friendly interface with automatic role management:

Basic Usage

response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a programming tutor that helps students learn Python.",
input="How do I create a class in Python?",
temperature=0.6,
)

print(response.output)

With Context History

response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a technical mentor providing step-by-step guidance.",
input="Can you show me a practical example?",
context=[
"How do I implement error handling in Python?",
"You can use try-except blocks to handle errors in Python..."
],
temperature=0.5,
)

print(response.output)

Streaming Conversations

stream = client.conversations.create_stream(
model="spec-3-turbo",
instructions="You are a creative storyteller.",
input="Tell me a short story about AI and humanity.",
stream=True,
)

print("Story: ", end="", flush=True)
for event in stream:
if not event.done:
print(event.content, end="", flush=True)
else:
print("\n Story completed!")

2. Advanced Chat Completions

For applications requiring full control over conversation flow:

Role-Based Messages

response = client.chat.create(
model="spec-3-turbo",
messages=[
{"role": "system", "content": "You are an expert DevOps engineer specializing in Kubernetes and cloud infrastructure."},
{"role": "user", "content": "Design a CI/CD pipeline for a microservices application using GitOps principles."},
{"role": "assistant", "content": "I'll design a comprehensive GitOps-based CI/CD pipeline for you..."},
{"role": "user", "content": "How would you handle rollbacks in this setup?"}
],
temperature=0.7,
max_tokens=1500,
)

print(response["choices"][0]["message"]["content"])

Multi-turn Conversation

conversation = [
{"role": "system", "content": "You are a helpful programming assistant."},
{"role": "user", "content": "How do I reverse a string in Python?"},
{"role": "assistant", "content": "You can reverse a string using slicing: string[::-1]"},
{"role": "user", "content": "Can you show me other methods?"}
]

response = client.chat.create(
model="spec-3-turbo",
messages=conversation,
temperature=0.5,
)

print(response["choices"][0]["message"]["content"])

Developer Role (System-level Instructions)

response = client.chat.create(
model="spec-3-turbo",
messages=[
{"role": "developer", "content": "You are an expert code reviewer. Provide detailed feedback on code quality, security, and best practices."},
{"role": "user", "content": "Please review this Python code: def add(a, b): return a + b"}
],
temperature=0.3,
)

print(response["choices"][0]["message"]["content"])

3. Real-time Streaming

Chat Streaming

stream = client.chat.create_stream(
model="spec-3-turbo",
messages=[
{"role": "system", "content": "You are a helpful programming assistant."},
{"role": "user", "content": "Explain the differences between SQL and NoSQL databases with practical examples."}
],
stream=True,
)

print("Response: ", end="", flush=True)
for event in stream:
if "choices" in event and len(event["choices"]) > 0:
delta = event["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
print(content, end="", flush=True)
print()

4. Document Processing & Analysis

Upload and Process Documents

# Upload from file
with open("research-paper.pdf", "rb") as f:
file_response = client.files.create(f, purpose="default")
file_id = file_response.file_id

print(f"File uploaded: {file_id}")

# Ask questions about the document
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a research assistant that analyzes academic papers.",
input="What are the key findings and methodology in this paper?",
files=[{"type": "file", "id": file_id}],
temperature=0.3,
)

print(response.output)

Multiple Document Analysis

# Upload multiple documents
documents = []
for file_path in ["manual.pdf", "faq.docx", "notes.txt"]:
with open(file_path, "rb") as f:
file_response = client.files.create(f, purpose="default")
documents.append(file_response.file_id)

# Analyze all documents together
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a document analyst. Provide comprehensive analysis and cross-reference information from all documents.",
input="Compare and contrast the information across all documents. What are the key themes and any discrepancies?",
files=[{"type": "file", "id": doc_id} for doc_id in documents],
temperature=0.2,
)

print(response.output)

Upload from String Content

content = """
# Research Notes
This document contains important findings from our Q4 analysis:
1. Revenue increased by 25%
2. Customer satisfaction improved by 15%
3. New product adoption rate: 40%
"""

file_response = client.files.create(
content.encode(),
purpose="default",
filename="research-notes.md"
)

# Analyze the content
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a business analyst. Analyze the data and provide insights.",
input="What are the key business metrics and their implications?",
files=[{"type": "file", "id": file_response.file_id}],
)

print(response.output)

Available Models & Selection Guide

Model Specifications

ModelUse CasePerformanceContext LengthBest For
spec-3-turboGeneral purpose, fast responsesHigh speed, good quality1M tokensQuick responses, general tasks
spec-3Balanced performance and qualityStandard speed, high quality1M tokensMost applications
theta-35Complex reasoning, analysisSlower, highest quality40K tokensResearch, complex analysis
theta-35-miniSimple tasks, cost-effectiveVery fast, basic quality40K tokensSimple Q&A, quick tasks
spec-2-miniSuper fast responsesHigh speed, basic quality32K tokens

List Available Models

# List all available models
models = client.models.list()
print("Available models:")
for model in models["models"]:
print(f"- {model['id']}: {model.get('description', 'No description available')}")

Model Selection Strategy

def select_optimal_model(task_type: str, complexity: str) -> str:
"""Select the best model based on task type and complexity."""
model_map = {
'conversation': {
'simple': 'theta-35-mini',
'moderate': 'spec-3-turbo',
'complex': 'spec-3'
},
'analysis': {
'simple': 'spec-3-turbo',
'moderate': 'spec-3',
'complex': 'theta-35'
},
'coding': {
'simple': 'spec-3-turbo',
'moderate': 'spec-3',
'complex': 'theta-35'
}
}

return model_map.get(task_type, {}).get(complexity, 'spec-3-turbo')

# Usage examples
quick_model = select_optimal_model('conversation', 'simple')
analysis_model = select_optimal_model('analysis', 'complex')

print(f"Quick conversations: {quick_model}")
print(f"Complex analysis: {analysis_model}")

Error Handling & Resilience

Comprehensive Error Handling

from svector import (
SVECTOR,
AuthenticationError,
RateLimitError,
NotFoundError,
UnprocessableEntityError,
APIError,
APIConnectionError,
APIConnectionTimeoutError
)

def robust_api_call(client, prompt: str, max_retries: int = 3):
"""Make API calls with comprehensive error handling and retry logic."""
import time

for attempt in range(max_retries):
try:
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a helpful assistant.",
input=prompt,
)
return response

except AuthenticationError as e:
print(f"Authentication failed: {e}")
print("Please check your API key at https://www.svector.co.in")
raise

except RateLimitError as e:
print(f"⏱️ Rate limit exceeded: {e}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
continue
raise

except APIConnectionError as e:
print(f"🔌 Connection error: {e}")
if attempt < max_retries - 1:
wait_time = 1 * (attempt + 1)
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
continue
raise

except NotFoundError as e:
print(f"Resource not found: {e}")
raise

except UnprocessableEntityError as e:
print(f"Invalid request: {e}")
raise

except APIError as e:
print(f"API error: {e} (Status: {e.status_code})")
print(f"Request ID: {getattr(e, 'request_id', 'N/A')}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
raise

except Exception as e:
print(f"Unexpected error: {e}")
raise

# Usage
client = SVECTOR()

try:
result = robust_api_call(client, "Explain quantum computing in simple terms.")
print(" Success:", result.output)
except Exception as e:
print(f"Final error: {e}")

Async Support

Basic Async Usage

import asyncio
from svector import AsyncSVECTOR

async def async_conversation():
async with AsyncSVECTOR() as client:
response = await client.conversations.create(
model="spec-3-turbo",
instructions="You are a helpful AI assistant.",
input="Explain the benefits of asynchronous programming.",
temperature=0.7,
)
print(response.output)
return response

# Run async function
asyncio.run(async_conversation())

Async Streaming

async def async_streaming():
async with AsyncSVECTOR() as client:
stream = await client.conversations.create_stream(
model="spec-3-turbo",
instructions="You are a creative storyteller.",
input="Write a poem about the future of AI.",
stream=True,
)

print("Poem: ", end="", flush=True)
async for event in stream:
if not event.done:
print(event.content, end="", flush=True)
print()

asyncio.run(async_streaming())

Concurrent Async Requests

async def concurrent_requests():
async with AsyncSVECTOR() as client:
# Multiple concurrent conversations
topics = ["artificial intelligence", "quantum computing", "blockchain", "machine learning"]

tasks = [
client.conversations.create(
model="spec-3-turbo",
instructions="You are an expert educator. Explain complex topics clearly.",
input=f"Explain {topic} in simple terms with practical examples.",
temperature=0.6,
)
for topic in topics
]

print("🚀 Starting concurrent requests...")
responses = await asyncio.gather(*tasks, return_exceptions=True)

for topic, response in zip(topics, responses):
if isinstance(response, Exception):
print(f"{topic}: Error - {response}")
else:
print(f" {topic}: {response.output[:100]}...")
print("─" * 50)

asyncio.run(concurrent_requests())

Complete Production Examples

Enterprise Document Analysis System

from svector import SVECTOR, AuthenticationError, RateLimitError
from pathlib import Path
import time
from typing import List, Dict, Any, Optional

class EnterpriseDocumentAnalyzer:
"""Enterprise-grade document analysis system with SVECTOR AI."""

def __init__(self, api_key: Optional[str] = None):
self.client = SVECTOR(
api_key=api_key,
timeout=60,
max_retries=3,
)
self.uploaded_files: List[str] = []
self.analysis_cache: Dict[str, Any] = {}

def upload_document(self, file_path: str) -> str:
"""Upload a document and return its file ID."""
try:
with open(file_path, "rb") as f:
file_response = self.client.files.create(
f,
purpose="default",
filename=Path(file_path).name
)

self.uploaded_files.append(file_response.file_id)
print(f" Uploaded: {file_path} (ID: {file_response.file_id})")
return file_response.file_id

except Exception as error:
print(f"Failed to upload {file_path}: {error}")
raise

def upload_document_from_text(self, content: str, filename: str) -> str:
"""Upload text content as a document."""
file_response = self.client.files.create(
content.encode(),
purpose="default",
filename=filename
)
self.uploaded_files.append(file_response.file_id)
print(f" Created document: {filename} (ID: {file_response.file_id})")
return file_response.file_id

def analyze_documents(self,
query: str,
analysis_type: str = "comprehensive",
file_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""Analyze uploaded documents with specified query and analysis type."""

# Use provided file IDs or all uploaded files
target_files = file_ids or self.uploaded_files

if not target_files:
raise ValueError("No documents available for analysis")

# Define analysis instructions
instructions = {
"summary": "You are an expert document summarizer. Provide clear, structured summaries with key points and actionable insights.",
"technical": "You are a technical analyst. Focus on technical details, methodologies, and implementation aspects.",
"business": "You are a business analyst. Analyze from a strategic perspective, focusing on opportunities, risks, and business implications.",
"comprehensive": "You are a senior research analyst. Provide detailed analysis with insights, patterns, recommendations, and actionable next steps.",
"comparative": "You are a comparative analyst. Compare and contrast documents, identify patterns, differences, and relationships."
}

# Create cache key
cache_key = f"{analysis_type}:{hash(query + ''.join(target_files))}"

# Check cache
if cache_key in self.analysis_cache:
print("📋 Retrieved from cache")
return self.analysis_cache[cache_key]

try:
print(f"🔍 Analyzing {len(target_files)} documents...")

response = self.client.conversations.create(
model="spec-3-turbo",
instructions=instructions.get(analysis_type, instructions["comprehensive"]),
input=query,
files=[{"type": "file", "id": file_id} for file_id in target_files],
temperature=0.2, # Lower temperature for factual analysis
)

result = {
"query": query,
"analysis_type": analysis_type,
"analysis": response.output,
"document_count": len(target_files),
"request_id": response.request_id,
"usage": response.usage,
"timestamp": time.time()
}

# Cache the result
self.analysis_cache[cache_key] = result

return result

except Exception as error:
print(f"Analysis failed: {error}")
raise

def compare_documents(self,
comparison_query: str,
file_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""Compare multiple documents."""
target_files = file_ids or self.uploaded_files

if len(target_files) < 2:
raise ValueError("Need at least 2 documents for comparison")

return self.analyze_documents(
f"Compare and contrast the documents regarding: {comparison_query}",
"comparative",
target_files
)

def batch_analyze(self, queries: List[str], analysis_type: str = "comprehensive") -> List[Dict[str, Any]]:
"""Perform batch analysis with multiple queries."""
results = []

for i, query in enumerate(queries, 1):
try:
print(f"🔄 Processing query {i}/{len(queries)}: {query[:50]}...")
result = self.analyze_documents(query, analysis_type)
results.append(result)

# Rate limiting - wait between requests
if i < len(queries):
time.sleep(1)

except Exception as error:
print(f"Failed to process query {i}: {error}")
results.append({
"query": query,
"error": str(error),
"success": False
})

return results

def generate_report(self, results: List[Dict[str, Any]]) -> str:
"""Generate a comprehensive report from analysis results."""
report_content = []

for i, result in enumerate(results, 1):
if result.get("success", True):
report_content.append(f"## Analysis {i}: {result['query']}")
report_content.append(f"**Type:** {result['analysis_type']}")
report_content.append(f"**Documents:** {result['document_count']}")
report_content.append(f"**Analysis:**")
report_content.append(result['analysis'])
report_content.append("---")
else:
report_content.append(f"## Analysis {i}: FAILED")
report_content.append(f"**Query:** {result['query']}")
report_content.append(f"**Error:** {result['error']}")
report_content.append("---")

return "\n\n".join(report_content)

def get_statistics(self) -> Dict[str, Any]:
"""Get usage statistics."""
return {
"uploaded_files": len(self.uploaded_files),
"cached_analyses": len(self.analysis_cache),
"file_ids": self.uploaded_files.copy()
}

# Usage Example
def main():
# Initialize analyzer
analyzer = EnterpriseDocumentAnalyzer()

# Upload documents
try:
# Upload various document types
analyzer.upload_document("./reports/quarterly-report.pdf")
analyzer.upload_document("./reports/annual-summary.docx")

# Upload text content
analyzer.upload_document_from_text("""
# Executive Summary
Q4 Performance:
- Revenue: $2.5M (↑15%)
- Customer Acquisition: 1,200 new customers
- Product Launches: 3 successful launches
- Market Expansion: 2 new regions

Key Challenges:
- Supply chain disruptions
- Increased competition
- Talent acquisition
""", "executive-summary.md")

# Single analysis
result = analyzer.analyze_documents(
"What are the key business metrics and their trends?",
"business"
)
print(f"📊 Analysis Result:")
print(result["analysis"])

# Batch analysis
queries = [
"What are the major achievements in Q4?",
"What challenges were identified and how were they addressed?",
"What are the strategic priorities for the next quarter?"
]

batch_results = analyzer.batch_analyze(queries, "business")

# Generate comprehensive report
report = analyzer.generate_report(batch_results)

# Save report
with open("analysis_report.md", "w") as f:
f.write(report)

print(" Analysis complete! Report saved to analysis_report.md")

# Print statistics
stats = analyzer.get_statistics()
print(f"Statistics: {stats}")

except Exception as e:
print(f"Error: {e}")

if __name__ == "__main__":
main()

Intelligent Chat Application

from svector import SVECTOR, RateLimitError
import json
from typing import List, Dict, Optional
from datetime import datetime

class IntelligentChatBot:
"""Enterprise-grade chatbot with conversation management."""

def __init__(self, api_key: Optional[str] = None):
self.client = SVECTOR(api_key=api_key, timeout=30, max_retries=3)
self.conversations: Dict[str, List[Dict]] = {}
self.default_instructions = "You are a helpful, knowledgeable AI assistant. Be conversational, accurate, and helpful."

def create_conversation(self, conversation_id: str, system_instructions: Optional[str] = None) -> str:
"""Create a new conversation thread."""
self.conversations[conversation_id] = []
instructions = system_instructions or self.default_instructions

# Add system message
self.conversations[conversation_id].append({
"role": "system",
"content": instructions,
"timestamp": datetime.now().isoformat()
})

return conversation_id

def chat(self,
conversation_id: str,
user_message: str,
stream: bool = False,
model: str = "spec-3-turbo") -> str:
"""Send a message and get a response."""

# Create conversation if it doesn't exist
if conversation_id not in self.conversations:
self.create_conversation(conversation_id)

# Add user message
self.conversations[conversation_id].append({
"role": "user",
"content": user_message,
"timestamp": datetime.now().isoformat()
})

try:
# Get conversation context (last 20 messages)
recent_messages = self.conversations[conversation_id][-20:]
context = [msg["content"] for msg in recent_messages[1:]] # Skip system message

if stream:
return self._stream_response(conversation_id, user_message, context, model)
else:
return self._get_response(conversation_id, user_message, context, model)

except RateLimitError as e:
error_msg = f"Rate limit exceeded. Please wait before sending another message."
self.conversations[conversation_id].append({
"role": "error",
"content": error_msg,
"timestamp": datetime.now().isoformat()
})
return error_msg

except Exception as e:
error_msg = f"An error occurred: {str(e)}"
self.conversations[conversation_id].append({
"role": "error",
"content": error_msg,
"timestamp": datetime.now().isoformat()
})
return error_msg

def _get_response(self, conversation_id: str, user_message: str, context: List[str], model: str) -> str:
"""Get a standard response."""
system_instructions = self.conversations[conversation_id][0]["content"]

response = self.client.conversations.create(
model=model,
instructions=system_instructions,
input=user_message,
context=context[-10:], # Last 10 messages for context
temperature=0.7,
)

# Add AI response to conversation
self.conversations[conversation_id].append({
"role": "assistant",
"content": response.output,
"timestamp": datetime.now().isoformat(),
"usage": response.usage
})

return response.output

def _stream_response(self, conversation_id: str, user_message: str, context: List[str], model: str) -> str:
"""Get a streaming response."""
system_instructions = self.conversations[conversation_id][0]["content"]

stream = self.client.conversations.create_stream(
model=model,
instructions=system_instructions,
input=user_message,
context=context[-10:],
stream=True,
)

print("Assistant: ", end="", flush=True)
full_response = ""

for event in stream:
if not event.done:
print(event.content, end="", flush=True)
full_response += event.content
print()

# Add AI response to conversation
self.conversations[conversation_id].append({
"role": "assistant",
"content": full_response,
"timestamp": datetime.now().isoformat()
})

return full_response

def get_conversation_history(self, conversation_id: str) -> List[Dict]:
"""Get full conversation history."""
return self.conversations.get(conversation_id, [])

def clear_conversation(self, conversation_id: str) -> None:
"""Clear a conversation."""
if conversation_id in self.conversations:
system_msg = self.conversations[conversation_id][0] # Keep system message
self.conversations[conversation_id] = [system_msg]

def export_conversation(self, conversation_id: str, file_path: str) -> None:
"""Export conversation to JSON file."""
if conversation_id in self.conversations:
with open(file_path, 'w') as f:
json.dump(self.conversations[conversation_id], f, indent=2)

def list_conversations(self) -> List[str]:
"""List all conversation IDs."""
return list(self.conversations.keys())

# Usage Example
def main():
# Initialize chatbot
chatbot = IntelligentChatBot()

# Create specialized conversations
chatbot.create_conversation(
"tech-support",
"You are a technical support specialist. Help users with technical issues in a clear, step-by-step manner."
)

chatbot.create_conversation(
"code-review",
"You are a senior software engineer specializing in code review. Provide constructive feedback on code quality, best practices, and security."
)

# Interactive chat session
print("🤖 SVECTOR Intelligent ChatBot")
print("Available conversations: tech-support, code-review, general")
print("Commands: /history, /clear, /export, /quit")

conversation_id = "general"
chatbot.create_conversation(conversation_id)

while True:
user_input = input(f"\n[{conversation_id}] You: ")

if user_input.lower() == '/quit':
break
elif user_input.lower() == '/history':
history = chatbot.get_conversation_history(conversation_id)
for msg in history[1:]: # Skip system message
print(f"{msg['role']}: {msg['content'][:100]}...")
continue
elif user_input.lower() == '/clear':
chatbot.clear_conversation(conversation_id)
print("Conversation cleared.")
continue
elif user_input.lower().startswith('/export'):
filename = f"conversation_{conversation_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
chatbot.export_conversation(conversation_id, filename)
print(f"Conversation exported to {filename}")
continue
elif user_input.lower().startswith('/switch'):
new_conv_id = user_input.split()[-1]
if new_conv_id in chatbot.list_conversations():
conversation_id = new_conv_id
print(f"Switched to conversation: {conversation_id}")
else:
print(f"Unknown conversation: {new_conv_id}")
continue

# Get response (streaming)
response = chatbot.chat(conversation_id, user_input, stream=True)

if __name__ == "__main__":
main()

Advanced Configuration

Client Configuration

from svector import SVECTOR

client = SVECTOR(
api_key="your-api-key",
base_url="https://spec-chat.tech", # Custom API endpoint
timeout=60, # Request timeout in seconds
max_retries=5, # Retry failed requests
verify_ssl=True, # SSL verification
http_client=None, # Custom HTTP client
)

Per-Request Configuration

response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a helpful assistant.",
input="Hello world",
timeout=120, # Override timeout for this request
headers={ # Additional headers
"X-Custom-Header": "value",
"X-Request-Source": "my-enterprise-app"
}
)

Environment-Based Configuration

import os
from svector import SVECTOR

def create_client(environment: str = "production") -> SVECTOR:
"""Create SVECTOR client based on environment."""

configs = {
"development": {
"api_key": os.getenv("SVECTOR_DEV_API_KEY"),
"base_url": "https://dev-api.spec-chat.tech",
"timeout": 30,
"max_retries": 2,
},
"staging": {
"api_key": os.getenv("SVECTOR_STAGING_API_KEY"),
"base_url": "https://staging-api.spec-chat.tech",
"timeout": 45,
"max_retries": 3,
},
"production": {
"api_key": os.getenv("SVECTOR_API_KEY"),
"base_url": "https://spec-chat.tech",
"timeout": 60,
"max_retries": 5,
}
}

config = configs.get(environment, configs["production"])
return SVECTOR(**config)

# Usage
client = create_client("production")

Best Practices

1. Use Environment Variables

import os
from svector import SVECTOR

# Recommended
client = SVECTOR(api_key=os.environ.get("SVECTOR_API_KEY"))

# Never do this
client = SVECTOR(api_key="sk-hardcoded-key-here")

2. Implement Proper Error Handling

from svector import SVECTOR, RateLimitError, AuthenticationError
import time

def safe_api_call(client, prompt, max_retries=3):
"""Safe API call with retry logic."""
for attempt in range(max_retries):
try:
return client.conversations.create(
model="spec-3-turbo",
instructions="You are helpful.",
input=prompt
)
except RateLimitError:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
time.sleep(wait_time)
else:
raise
except AuthenticationError:
# Don't retry authentication errors
raise

3. Use Context Managers for Async

#  Recommended
async with AsyncSVECTOR() as client:
response = await client.conversations.create(...)

# Manual cleanup required
client = AsyncSVECTOR()
try:
response = await client.conversations.create(...)
finally:
await client.close()

4. Select Appropriate Models

# For quick responses
model = "spec-3-turbo"

# For complex analysis
model = "theta-35"

# For simple tasks
model = "theta-35-mini"

5. Cache File Uploads

#  Upload once, use multiple times
file_cache = {}

def get_or_upload_file(client, file_path):
if file_path not in file_cache:
with open(file_path, "rb") as f:
response = client.files.create(f, purpose="default")
file_cache[file_path] = response.file_id
return file_cache[file_path]

Testing

Unit Tests

import pytest
from svector import SVECTOR, AuthenticationError

def test_client_initialization():
client = SVECTOR(api_key="test-key")
assert client.api_key == "test-key"

def test_invalid_api_key():
client = SVECTOR(api_key="invalid-key")

with pytest.raises(AuthenticationError):
client.conversations.create(
model="spec-3-turbo",
instructions="Test",
input="Test"
)

# Run tests
# pytest test_svector.py

Integration Tests

import os
from svector import SVECTOR

def test_basic_conversation():
client = SVECTOR(api_key=os.environ.get("SVECTOR_TEST_API_KEY"))

response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a test assistant.",
input="Say 'Hello, World!'"
)

assert response.output
assert "Hello" in response.output
assert response.request_id