Starting with PyPI
Overview
The SVECTOR Python SDK is officially available on PyPI, providing comprehensive integration for Python applications with full support for both synchronous and asynchronous operations. This SDK offers intuitive model completions, advanced document processing, and seamless integration with SVECTOR's cutting-edge AI systems.
What you'll learn:
- Installing and authenticating the SVECTOR Python SDK
- Using the Conversations API for simplified interactions
- Advanced Chat Completions with full message control
- Real-time streaming responses and document processing
- Async support for high-performance applications
- Enterprise-grade error handling and best practices
- Complete production examples and configuration patterns
Installation
Standard Installation
pip install svector-sdk
Development Installation
For contributing or development work:
git clone https://github.com/svector-corporation/svector-python
cd svector-python
pip install -e ".[dev]"
With Optional Dependencies
# Install with development tools
pip install "svector-sdk[dev]"
# Install with test dependencies
pip install "svector-sdk[test]"
Authentication
Environment Variable (Recommended)
Set your API key as an environment variable:
export SVECTOR_API_KEY="your-api-key-here"
from svector import SVECTOR
# Automatically reads from SVECTOR_API_KEY environment variable
client = SVECTOR()
Direct API Key
from svector import SVECTOR
client = SVECTOR(api_key="your-api-key-here")
Get Your API Key
- Visit the SVECTOR Dashboard
- Navigate to API Keys section
- Generate a new API key
- Copy and securely store your key
Quick Start Examples
Basic Conversation
from svector import SVECTOR
client = SVECTOR()
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a helpful AI assistant that explains complex topics clearly.",
input="What is artificial intelligence?",
temperature=0.7,
)
print(response.output)
print(f"Request ID: {response.request_id}")
print(f"Token Usage: {response.usage}")
Advanced Configuration
from svector import SVECTOR
client = SVECTOR(
api_key="your-api-key",
base_url="https://spec-chat.tech",
timeout=30,
max_retries=3,
verify_ssl=True,
)
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are an expert software architect with deep knowledge of distributed systems.",
input="Design a microservices architecture for a high-traffic e-commerce platform.",
temperature=0.8,
max_tokens=1000,
)
print(response.output)
Core Features
1. Conversations API (Recommended)
The Conversations API provides a user-friendly interface with automatic role management:
Basic Usage
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a programming tutor that helps students learn Python.",
input="How do I create a class in Python?",
temperature=0.6,
)
print(response.output)
With Context History
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a technical mentor providing step-by-step guidance.",
input="Can you show me a practical example?",
context=[
"How do I implement error handling in Python?",
"You can use try-except blocks to handle errors in Python..."
],
temperature=0.5,
)
print(response.output)
Streaming Conversations
stream = client.conversations.create_stream(
model="spec-3-turbo",
instructions="You are a creative storyteller.",
input="Tell me a short story about AI and humanity.",
stream=True,
)
print("Story: ", end="", flush=True)
for event in stream:
if not event.done:
print(event.content, end="", flush=True)
else:
print("\n Story completed!")
2. Advanced Chat Completions
For applications requiring full control over conversation flow:
Role-Based Messages
response = client.chat.create(
model="spec-3-turbo",
messages=[
{"role": "system", "content": "You are an expert DevOps engineer specializing in Kubernetes and cloud infrastructure."},
{"role": "user", "content": "Design a CI/CD pipeline for a microservices application using GitOps principles."},
{"role": "assistant", "content": "I'll design a comprehensive GitOps-based CI/CD pipeline for you..."},
{"role": "user", "content": "How would you handle rollbacks in this setup?"}
],
temperature=0.7,
max_tokens=1500,
)
print(response["choices"][0]["message"]["content"])
Multi-turn Conversation
conversation = [
{"role": "system", "content": "You are a helpful programming assistant."},
{"role": "user", "content": "How do I reverse a string in Python?"},
{"role": "assistant", "content": "You can reverse a string using slicing: string[::-1]"},
{"role": "user", "content": "Can you show me other methods?"}
]
response = client.chat.create(
model="spec-3-turbo",
messages=conversation,
temperature=0.5,
)
print(response["choices"][0]["message"]["content"])
Developer Role (System-level Instructions)
response = client.chat.create(
model="spec-3-turbo",
messages=[
{"role": "developer", "content": "You are an expert code reviewer. Provide detailed feedback on code quality, security, and best practices."},
{"role": "user", "content": "Please review this Python code: def add(a, b): return a + b"}
],
temperature=0.3,
)
print(response["choices"][0]["message"]["content"])
3. Real-time Streaming
Chat Streaming
stream = client.chat.create_stream(
model="spec-3-turbo",
messages=[
{"role": "system", "content": "You are a helpful programming assistant."},
{"role": "user", "content": "Explain the differences between SQL and NoSQL databases with practical examples."}
],
stream=True,
)
print("Response: ", end="", flush=True)
for event in stream:
if "choices" in event and len(event["choices"]) > 0:
delta = event["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
print(content, end="", flush=True)
print()
4. Document Processing & Analysis
Upload and Process Documents
# Upload from file
with open("research-paper.pdf", "rb") as f:
file_response = client.files.create(f, purpose="default")
file_id = file_response.file_id
print(f"File uploaded: {file_id}")
# Ask questions about the document
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a research assistant that analyzes academic papers.",
input="What are the key findings and methodology in this paper?",
files=[{"type": "file", "id": file_id}],
temperature=0.3,
)
print(response.output)
Multiple Document Analysis
# Upload multiple documents
documents = []
for file_path in ["manual.pdf", "faq.docx", "notes.txt"]:
with open(file_path, "rb") as f:
file_response = client.files.create(f, purpose="default")
documents.append(file_response.file_id)
# Analyze all documents together
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a document analyst. Provide comprehensive analysis and cross-reference information from all documents.",
input="Compare and contrast the information across all documents. What are the key themes and any discrepancies?",
files=[{"type": "file", "id": doc_id} for doc_id in documents],
temperature=0.2,
)
print(response.output)
Upload from String Content
content = """
# Research Notes
This document contains important findings from our Q4 analysis:
1. Revenue increased by 25%
2. Customer satisfaction improved by 15%
3. New product adoption rate: 40%
"""
file_response = client.files.create(
content.encode(),
purpose="default",
filename="research-notes.md"
)
# Analyze the content
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a business analyst. Analyze the data and provide insights.",
input="What are the key business metrics and their implications?",
files=[{"type": "file", "id": file_response.file_id}],
)
print(response.output)
Available Models & Selection Guide
Model Specifications
Model | Use Case | Performance | Context Length | Best For |
---|---|---|---|---|
spec-3-turbo | General purpose, fast responses | High speed, good quality | 1M tokens | Quick responses, general tasks |
spec-3 | Balanced performance and quality | Standard speed, high quality | 1M tokens | Most applications |
theta-35 | Complex reasoning, analysis | Slower, highest quality | 40K tokens | Research, complex analysis |
theta-35-mini | Simple tasks, cost-effective | Very fast, basic quality | 40K tokens | Simple Q&A, quick tasks |
spec-2-mini | Super fast responses | High speed, basic quality | 32K tokens |
List Available Models
# List all available models
models = client.models.list()
print("Available models:")
for model in models["models"]:
print(f"- {model['id']}: {model.get('description', 'No description available')}")
Model Selection Strategy
def select_optimal_model(task_type: str, complexity: str) -> str:
"""Select the best model based on task type and complexity."""
model_map = {
'conversation': {
'simple': 'theta-35-mini',
'moderate': 'spec-3-turbo',
'complex': 'spec-3'
},
'analysis': {
'simple': 'spec-3-turbo',
'moderate': 'spec-3',
'complex': 'theta-35'
},
'coding': {
'simple': 'spec-3-turbo',
'moderate': 'spec-3',
'complex': 'theta-35'
}
}
return model_map.get(task_type, {}).get(complexity, 'spec-3-turbo')
# Usage examples
quick_model = select_optimal_model('conversation', 'simple')
analysis_model = select_optimal_model('analysis', 'complex')
print(f"Quick conversations: {quick_model}")
print(f"Complex analysis: {analysis_model}")
Error Handling & Resilience
Comprehensive Error Handling
from svector import (
SVECTOR,
AuthenticationError,
RateLimitError,
NotFoundError,
UnprocessableEntityError,
APIError,
APIConnectionError,
APIConnectionTimeoutError
)
def robust_api_call(client, prompt: str, max_retries: int = 3):
"""Make API calls with comprehensive error handling and retry logic."""
import time
for attempt in range(max_retries):
try:
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a helpful assistant.",
input=prompt,
)
return response
except AuthenticationError as e:
print(f"Authentication failed: {e}")
print("Please check your API key at https://www.svector.co.in")
raise
except RateLimitError as e:
print(f"⏱️ Rate limit exceeded: {e}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
continue
raise
except APIConnectionError as e:
print(f"🔌 Connection error: {e}")
if attempt < max_retries - 1:
wait_time = 1 * (attempt + 1)
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
continue
raise
except NotFoundError as e:
print(f"Resource not found: {e}")
raise
except UnprocessableEntityError as e:
print(f"Invalid request: {e}")
raise
except APIError as e:
print(f"API error: {e} (Status: {e.status_code})")
print(f"Request ID: {getattr(e, 'request_id', 'N/A')}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt)
continue
raise
except Exception as e:
print(f"Unexpected error: {e}")
raise
# Usage
client = SVECTOR()
try:
result = robust_api_call(client, "Explain quantum computing in simple terms.")
print(" Success:", result.output)
except Exception as e:
print(f"Final error: {e}")
Async Support
Basic Async Usage
import asyncio
from svector import AsyncSVECTOR
async def async_conversation():
async with AsyncSVECTOR() as client:
response = await client.conversations.create(
model="spec-3-turbo",
instructions="You are a helpful AI assistant.",
input="Explain the benefits of asynchronous programming.",
temperature=0.7,
)
print(response.output)
return response
# Run async function
asyncio.run(async_conversation())
Async Streaming
async def async_streaming():
async with AsyncSVECTOR() as client:
stream = await client.conversations.create_stream(
model="spec-3-turbo",
instructions="You are a creative storyteller.",
input="Write a poem about the future of AI.",
stream=True,
)
print("Poem: ", end="", flush=True)
async for event in stream:
if not event.done:
print(event.content, end="", flush=True)
print()
asyncio.run(async_streaming())
Concurrent Async Requests
async def concurrent_requests():
async with AsyncSVECTOR() as client:
# Multiple concurrent conversations
topics = ["artificial intelligence", "quantum computing", "blockchain", "machine learning"]
tasks = [
client.conversations.create(
model="spec-3-turbo",
instructions="You are an expert educator. Explain complex topics clearly.",
input=f"Explain {topic} in simple terms with practical examples.",
temperature=0.6,
)
for topic in topics
]
print("🚀 Starting concurrent requests...")
responses = await asyncio.gather(*tasks, return_exceptions=True)
for topic, response in zip(topics, responses):
if isinstance(response, Exception):
print(f"{topic}: Error - {response}")
else:
print(f" {topic}: {response.output[:100]}...")
print("─" * 50)
asyncio.run(concurrent_requests())
Complete Production Examples
Enterprise Document Analysis System
from svector import SVECTOR, AuthenticationError, RateLimitError
from pathlib import Path
import time
from typing import List, Dict, Any, Optional
class EnterpriseDocumentAnalyzer:
"""Enterprise-grade document analysis system with SVECTOR AI."""
def __init__(self, api_key: Optional[str] = None):
self.client = SVECTOR(
api_key=api_key,
timeout=60,
max_retries=3,
)
self.uploaded_files: List[str] = []
self.analysis_cache: Dict[str, Any] = {}
def upload_document(self, file_path: str) -> str:
"""Upload a document and return its file ID."""
try:
with open(file_path, "rb") as f:
file_response = self.client.files.create(
f,
purpose="default",
filename=Path(file_path).name
)
self.uploaded_files.append(file_response.file_id)
print(f" Uploaded: {file_path} (ID: {file_response.file_id})")
return file_response.file_id
except Exception as error:
print(f"Failed to upload {file_path}: {error}")
raise
def upload_document_from_text(self, content: str, filename: str) -> str:
"""Upload text content as a document."""
file_response = self.client.files.create(
content.encode(),
purpose="default",
filename=filename
)
self.uploaded_files.append(file_response.file_id)
print(f" Created document: {filename} (ID: {file_response.file_id})")
return file_response.file_id
def analyze_documents(self,
query: str,
analysis_type: str = "comprehensive",
file_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""Analyze uploaded documents with specified query and analysis type."""
# Use provided file IDs or all uploaded files
target_files = file_ids or self.uploaded_files
if not target_files:
raise ValueError("No documents available for analysis")
# Define analysis instructions
instructions = {
"summary": "You are an expert document summarizer. Provide clear, structured summaries with key points and actionable insights.",
"technical": "You are a technical analyst. Focus on technical details, methodologies, and implementation aspects.",
"business": "You are a business analyst. Analyze from a strategic perspective, focusing on opportunities, risks, and business implications.",
"comprehensive": "You are a senior research analyst. Provide detailed analysis with insights, patterns, recommendations, and actionable next steps.",
"comparative": "You are a comparative analyst. Compare and contrast documents, identify patterns, differences, and relationships."
}
# Create cache key
cache_key = f"{analysis_type}:{hash(query + ''.join(target_files))}"
# Check cache
if cache_key in self.analysis_cache:
print("📋 Retrieved from cache")
return self.analysis_cache[cache_key]
try:
print(f"🔍 Analyzing {len(target_files)} documents...")
response = self.client.conversations.create(
model="spec-3-turbo",
instructions=instructions.get(analysis_type, instructions["comprehensive"]),
input=query,
files=[{"type": "file", "id": file_id} for file_id in target_files],
temperature=0.2, # Lower temperature for factual analysis
)
result = {
"query": query,
"analysis_type": analysis_type,
"analysis": response.output,
"document_count": len(target_files),
"request_id": response.request_id,
"usage": response.usage,
"timestamp": time.time()
}
# Cache the result
self.analysis_cache[cache_key] = result
return result
except Exception as error:
print(f"Analysis failed: {error}")
raise
def compare_documents(self,
comparison_query: str,
file_ids: Optional[List[str]] = None) -> Dict[str, Any]:
"""Compare multiple documents."""
target_files = file_ids or self.uploaded_files
if len(target_files) < 2:
raise ValueError("Need at least 2 documents for comparison")
return self.analyze_documents(
f"Compare and contrast the documents regarding: {comparison_query}",
"comparative",
target_files
)
def batch_analyze(self, queries: List[str], analysis_type: str = "comprehensive") -> List[Dict[str, Any]]:
"""Perform batch analysis with multiple queries."""
results = []
for i, query in enumerate(queries, 1):
try:
print(f"🔄 Processing query {i}/{len(queries)}: {query[:50]}...")
result = self.analyze_documents(query, analysis_type)
results.append(result)
# Rate limiting - wait between requests
if i < len(queries):
time.sleep(1)
except Exception as error:
print(f"Failed to process query {i}: {error}")
results.append({
"query": query,
"error": str(error),
"success": False
})
return results
def generate_report(self, results: List[Dict[str, Any]]) -> str:
"""Generate a comprehensive report from analysis results."""
report_content = []
for i, result in enumerate(results, 1):
if result.get("success", True):
report_content.append(f"## Analysis {i}: {result['query']}")
report_content.append(f"**Type:** {result['analysis_type']}")
report_content.append(f"**Documents:** {result['document_count']}")
report_content.append(f"**Analysis:**")
report_content.append(result['analysis'])
report_content.append("---")
else:
report_content.append(f"## Analysis {i}: FAILED")
report_content.append(f"**Query:** {result['query']}")
report_content.append(f"**Error:** {result['error']}")
report_content.append("---")
return "\n\n".join(report_content)
def get_statistics(self) -> Dict[str, Any]:
"""Get usage statistics."""
return {
"uploaded_files": len(self.uploaded_files),
"cached_analyses": len(self.analysis_cache),
"file_ids": self.uploaded_files.copy()
}
# Usage Example
def main():
# Initialize analyzer
analyzer = EnterpriseDocumentAnalyzer()
# Upload documents
try:
# Upload various document types
analyzer.upload_document("./reports/quarterly-report.pdf")
analyzer.upload_document("./reports/annual-summary.docx")
# Upload text content
analyzer.upload_document_from_text("""
# Executive Summary
Q4 Performance:
- Revenue: $2.5M (↑15%)
- Customer Acquisition: 1,200 new customers
- Product Launches: 3 successful launches
- Market Expansion: 2 new regions
Key Challenges:
- Supply chain disruptions
- Increased competition
- Talent acquisition
""", "executive-summary.md")
# Single analysis
result = analyzer.analyze_documents(
"What are the key business metrics and their trends?",
"business"
)
print(f"📊 Analysis Result:")
print(result["analysis"])
# Batch analysis
queries = [
"What are the major achievements in Q4?",
"What challenges were identified and how were they addressed?",
"What are the strategic priorities for the next quarter?"
]
batch_results = analyzer.batch_analyze(queries, "business")
# Generate comprehensive report
report = analyzer.generate_report(batch_results)
# Save report
with open("analysis_report.md", "w") as f:
f.write(report)
print(" Analysis complete! Report saved to analysis_report.md")
# Print statistics
stats = analyzer.get_statistics()
print(f"Statistics: {stats}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
main()
Intelligent Chat Application
from svector import SVECTOR, RateLimitError
import json
from typing import List, Dict, Optional
from datetime import datetime
class IntelligentChatBot:
"""Enterprise-grade chatbot with conversation management."""
def __init__(self, api_key: Optional[str] = None):
self.client = SVECTOR(api_key=api_key, timeout=30, max_retries=3)
self.conversations: Dict[str, List[Dict]] = {}
self.default_instructions = "You are a helpful, knowledgeable AI assistant. Be conversational, accurate, and helpful."
def create_conversation(self, conversation_id: str, system_instructions: Optional[str] = None) -> str:
"""Create a new conversation thread."""
self.conversations[conversation_id] = []
instructions = system_instructions or self.default_instructions
# Add system message
self.conversations[conversation_id].append({
"role": "system",
"content": instructions,
"timestamp": datetime.now().isoformat()
})
return conversation_id
def chat(self,
conversation_id: str,
user_message: str,
stream: bool = False,
model: str = "spec-3-turbo") -> str:
"""Send a message and get a response."""
# Create conversation if it doesn't exist
if conversation_id not in self.conversations:
self.create_conversation(conversation_id)
# Add user message
self.conversations[conversation_id].append({
"role": "user",
"content": user_message,
"timestamp": datetime.now().isoformat()
})
try:
# Get conversation context (last 20 messages)
recent_messages = self.conversations[conversation_id][-20:]
context = [msg["content"] for msg in recent_messages[1:]] # Skip system message
if stream:
return self._stream_response(conversation_id, user_message, context, model)
else:
return self._get_response(conversation_id, user_message, context, model)
except RateLimitError as e:
error_msg = f"Rate limit exceeded. Please wait before sending another message."
self.conversations[conversation_id].append({
"role": "error",
"content": error_msg,
"timestamp": datetime.now().isoformat()
})
return error_msg
except Exception as e:
error_msg = f"An error occurred: {str(e)}"
self.conversations[conversation_id].append({
"role": "error",
"content": error_msg,
"timestamp": datetime.now().isoformat()
})
return error_msg
def _get_response(self, conversation_id: str, user_message: str, context: List[str], model: str) -> str:
"""Get a standard response."""
system_instructions = self.conversations[conversation_id][0]["content"]
response = self.client.conversations.create(
model=model,
instructions=system_instructions,
input=user_message,
context=context[-10:], # Last 10 messages for context
temperature=0.7,
)
# Add AI response to conversation
self.conversations[conversation_id].append({
"role": "assistant",
"content": response.output,
"timestamp": datetime.now().isoformat(),
"usage": response.usage
})
return response.output
def _stream_response(self, conversation_id: str, user_message: str, context: List[str], model: str) -> str:
"""Get a streaming response."""
system_instructions = self.conversations[conversation_id][0]["content"]
stream = self.client.conversations.create_stream(
model=model,
instructions=system_instructions,
input=user_message,
context=context[-10:],
stream=True,
)
print("Assistant: ", end="", flush=True)
full_response = ""
for event in stream:
if not event.done:
print(event.content, end="", flush=True)
full_response += event.content
print()
# Add AI response to conversation
self.conversations[conversation_id].append({
"role": "assistant",
"content": full_response,
"timestamp": datetime.now().isoformat()
})
return full_response
def get_conversation_history(self, conversation_id: str) -> List[Dict]:
"""Get full conversation history."""
return self.conversations.get(conversation_id, [])
def clear_conversation(self, conversation_id: str) -> None:
"""Clear a conversation."""
if conversation_id in self.conversations:
system_msg = self.conversations[conversation_id][0] # Keep system message
self.conversations[conversation_id] = [system_msg]
def export_conversation(self, conversation_id: str, file_path: str) -> None:
"""Export conversation to JSON file."""
if conversation_id in self.conversations:
with open(file_path, 'w') as f:
json.dump(self.conversations[conversation_id], f, indent=2)
def list_conversations(self) -> List[str]:
"""List all conversation IDs."""
return list(self.conversations.keys())
# Usage Example
def main():
# Initialize chatbot
chatbot = IntelligentChatBot()
# Create specialized conversations
chatbot.create_conversation(
"tech-support",
"You are a technical support specialist. Help users with technical issues in a clear, step-by-step manner."
)
chatbot.create_conversation(
"code-review",
"You are a senior software engineer specializing in code review. Provide constructive feedback on code quality, best practices, and security."
)
# Interactive chat session
print("🤖 SVECTOR Intelligent ChatBot")
print("Available conversations: tech-support, code-review, general")
print("Commands: /history, /clear, /export, /quit")
conversation_id = "general"
chatbot.create_conversation(conversation_id)
while True:
user_input = input(f"\n[{conversation_id}] You: ")
if user_input.lower() == '/quit':
break
elif user_input.lower() == '/history':
history = chatbot.get_conversation_history(conversation_id)
for msg in history[1:]: # Skip system message
print(f"{msg['role']}: {msg['content'][:100]}...")
continue
elif user_input.lower() == '/clear':
chatbot.clear_conversation(conversation_id)
print("Conversation cleared.")
continue
elif user_input.lower().startswith('/export'):
filename = f"conversation_{conversation_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
chatbot.export_conversation(conversation_id, filename)
print(f"Conversation exported to {filename}")
continue
elif user_input.lower().startswith('/switch'):
new_conv_id = user_input.split()[-1]
if new_conv_id in chatbot.list_conversations():
conversation_id = new_conv_id
print(f"Switched to conversation: {conversation_id}")
else:
print(f"Unknown conversation: {new_conv_id}")
continue
# Get response (streaming)
response = chatbot.chat(conversation_id, user_input, stream=True)
if __name__ == "__main__":
main()
Advanced Configuration
Client Configuration
from svector import SVECTOR
client = SVECTOR(
api_key="your-api-key",
base_url="https://spec-chat.tech", # Custom API endpoint
timeout=60, # Request timeout in seconds
max_retries=5, # Retry failed requests
verify_ssl=True, # SSL verification
http_client=None, # Custom HTTP client
)
Per-Request Configuration
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a helpful assistant.",
input="Hello world",
timeout=120, # Override timeout for this request
headers={ # Additional headers
"X-Custom-Header": "value",
"X-Request-Source": "my-enterprise-app"
}
)
Environment-Based Configuration
import os
from svector import SVECTOR
def create_client(environment: str = "production") -> SVECTOR:
"""Create SVECTOR client based on environment."""
configs = {
"development": {
"api_key": os.getenv("SVECTOR_DEV_API_KEY"),
"base_url": "https://dev-api.spec-chat.tech",
"timeout": 30,
"max_retries": 2,
},
"staging": {
"api_key": os.getenv("SVECTOR_STAGING_API_KEY"),
"base_url": "https://staging-api.spec-chat.tech",
"timeout": 45,
"max_retries": 3,
},
"production": {
"api_key": os.getenv("SVECTOR_API_KEY"),
"base_url": "https://spec-chat.tech",
"timeout": 60,
"max_retries": 5,
}
}
config = configs.get(environment, configs["production"])
return SVECTOR(**config)
# Usage
client = create_client("production")
Best Practices
1. Use Environment Variables
import os
from svector import SVECTOR
# Recommended
client = SVECTOR(api_key=os.environ.get("SVECTOR_API_KEY"))
# Never do this
client = SVECTOR(api_key="sk-hardcoded-key-here")
2. Implement Proper Error Handling
from svector import SVECTOR, RateLimitError, AuthenticationError
import time
def safe_api_call(client, prompt, max_retries=3):
"""Safe API call with retry logic."""
for attempt in range(max_retries):
try:
return client.conversations.create(
model="spec-3-turbo",
instructions="You are helpful.",
input=prompt
)
except RateLimitError:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
time.sleep(wait_time)
else:
raise
except AuthenticationError:
# Don't retry authentication errors
raise
3. Use Context Managers for Async
# Recommended
async with AsyncSVECTOR() as client:
response = await client.conversations.create(...)
# Manual cleanup required
client = AsyncSVECTOR()
try:
response = await client.conversations.create(...)
finally:
await client.close()
4. Select Appropriate Models
# For quick responses
model = "spec-3-turbo"
# For complex analysis
model = "theta-35"
# For simple tasks
model = "theta-35-mini"
5. Cache File Uploads
# Upload once, use multiple times
file_cache = {}
def get_or_upload_file(client, file_path):
if file_path not in file_cache:
with open(file_path, "rb") as f:
response = client.files.create(f, purpose="default")
file_cache[file_path] = response.file_id
return file_cache[file_path]
Testing
Unit Tests
import pytest
from svector import SVECTOR, AuthenticationError
def test_client_initialization():
client = SVECTOR(api_key="test-key")
assert client.api_key == "test-key"
def test_invalid_api_key():
client = SVECTOR(api_key="invalid-key")
with pytest.raises(AuthenticationError):
client.conversations.create(
model="spec-3-turbo",
instructions="Test",
input="Test"
)
# Run tests
# pytest test_svector.py
Integration Tests
import os
from svector import SVECTOR
def test_basic_conversation():
client = SVECTOR(api_key=os.environ.get("SVECTOR_TEST_API_KEY"))
response = client.conversations.create(
model="spec-3-turbo",
instructions="You are a test assistant.",
input="Say 'Hello, World!'"
)
assert response.output
assert "Hello" in response.output
assert response.request_id
Links & Support
- Support: support@svector.co.in
- Issues: GitHub Issues
- PyPI Package: svector-sdk
- Source Code: GitHub Repository