Agentbase Library
Agentbase is designed with a key objective: to enable asynchronous calls to multiple language models that can collaboratively work on shared objects while maintaining distinct memories and instructions. The command structure serves as the foundation, allowing multiple agents to iteratively process data, each with its own specialized context and capabilities, yet operating within a unified workflow.
This library provides a collection of utilities and core components to simplify working with the Agentbase paradigm, standardize implementation across APIs, and reduce boilerplate code.
Core Components
BaseAgent
The foundation class for all agent implementations:
export abstract class BaseAgent {
id: string;
name: string;
capabilities: string[];
memoryStore: MemoryStore;
constructor(id: string, name: string, capabilities: string[] = []) {
this.id = id;
this.name = name;
this.capabilities = capabilities;
this.memoryStore = new MemoryStore(id);
}
/**
* Validates that a command can be executed by this agent
*/
validateCommandCapabilities(command: DbCommand): boolean {
// Implementation to check if the command's required capabilities
// are supported by this agent
return true;
}
/**
* Abstract method that must be implemented by all agent types
*/
abstract executeCommand(command: DbCommand): Promise<CommandExecutionResult>;
/**
* Helper method to execute tools in sequence
*/
async executeTools(tools: any[]): Promise<ToolExecutionResult[]> {
const results: ToolExecutionResult[] = [];
for (const tool of tools) {
try {
// Update tool status to running
tool.status = 'running';
// Execute the tool
const result = await this.executeTool(tool);
// Mark as completed and save result
tool.status = 'completed';
results.push({
tool: tool.name,
status: 'completed',
result
});
} catch (error) {
tool.status = 'failed';
results.push({
tool: tool.name,
status: 'failed',
error: error.message
});
}
}
return results;
}
/**
* Executes a single tool
*/
async executeTool(tool: any): Promise<any> {
// Tool execution implementation
throw new Error('Tool execution not implemented');
}
/**
* Process target outputs based on tool results
*/
async processTargets(targets: any[], toolResults: ToolExecutionResult[]): Promise<any[]> {
// Target processing implementation
return [];
}
/**
* Consult supervisors for approval or modification of results
*/
async consultSupervisors(supervisors: any[], results: any[]): Promise<any[]> {
// Supervisor consultation implementation
return results;
}
}Command Factory
Utility to simplify the creation of standardized command objects:
export class CommandFactory {
/**
* Creates a new command with required fields
*/
static createCommand(params: {
task: string;
userId: string;
description?: string;
targets?: any[];
tools?: any[];
context?: string;
supervisor?: any[];
agentId?: string;
model?: string;
priority?: number;
executionOrder?: string[];
supervisionParams?: SupervisionParams;
}): CreateCommandParams {
return {
task: params.task,
status: 'pending',
description: params.description,
targets: params.targets || [],
tools: params.tools || [],
context: params.context,
supervisor: params.supervisor,
model: params.model,
agent_id: params.agentId,
user_id: params.userId,
priority: params.priority || 5,
execution_order: params.executionOrder,
supervision_params: params.supervisionParams
};
}
}CommandService
Service to manage command lifecycle:
export class CommandService {
private eventEmitter: EventEmitter;
constructor() {
this.eventEmitter = new EventEmitter();
}
/**
* Submit a command for execution
*/
async submitCommand(command: CreateCommandParams): Promise<string> {
const createdCommand = await createCommand(command);
this.eventEmitter.emit('commandCreated', createdCommand);
return createdCommand.id;
}
/**
* Update the status of a command
*/
async updateStatus(commandId: string, status: CommandStatus): Promise<boolean> {
const updated = await updateCommandStatus(commandId, status);
if (updated) {
this.eventEmitter.emit('statusChange', { id: commandId, status });
}
return updated;
}
/**
* Update a tool's status within a command
*/
async updateToolStatus(
commandId: string,
toolName: string,
status: string,
result?: any
): Promise<boolean> {
const command = await getCommandById(commandId);
if (!command) {
return false;
}
const tools = [...(command.tools || [])];
const toolIndex = tools.findIndex(t => t.name === toolName);
if (toolIndex === -1) {
return false;
}
tools[toolIndex] = {
...tools[toolIndex],
status,
result
};
const { error } = await supabaseAdmin
.from('commands')
.update({ tools })
.eq('id', commandId);
return !error;
}
/**
* Update command with results
*/
async updateCommand(
commandId: string,
updates: Partial<Omit<DbCommand, 'id' | 'created_at' | 'updated_at'>>
): Promise<DbCommand> {
const updated = await updateCommand(commandId, updates);
this.eventEmitter.emit('commandUpdated', updated);
return updated;
}
/**
* Register event listener
*/
on(event: string, listener: (...args: any[]) => void): void {
this.eventEmitter.on(event, listener);
}
/**
* Remove event listener
*/
off(event: string, listener: (...args: any[]) => void): void {
this.eventEmitter.off(event, listener);
}
/**
* Update command execution order
*/
async updateExecutionOrder(commandId: string, executionOrder: string[]): Promise<boolean> {
const { error } = await supabaseAdmin
.from('commands')
.update({ execution_order: executionOrder })
.eq('id', commandId);
return !error;
}
/**
* Update command priority
*/
async updatePriority(commandId: string, priority: number): Promise<boolean> {
const { error } = await supabaseAdmin
.from('commands')
.update({ priority })
.eq('id', commandId);
return !error;
}
/**
* Request supervision for a command
*/
async requestSupervision(commandId: string, supervisionRequest: SupervisionRequest): Promise<SupervisionResponse> {
const command = await getCommandById(commandId);
if (!command) {
throw new Error(`Command not found: ${commandId}`);
}
// Implementation to request supervision
// This could involve notifying a supervisor agent or queuing for review
return {
requestId: generateId('supervision'),
status: 'pending',
commandId
};
}
/**
* Check supervision status
*/
async checkSupervisionStatus(requestId: string): Promise<SupervisionStatus> {
// Implementation to check supervision status
return 'pending';
}
/**
* Submit supervision decision
*/
async submitSupervisionDecision(
requestId: string,
decision: SupervisionDecision
): Promise<boolean> {
// Implementation to record supervisor decision
return true;
}
}MemoryStore
Store for agent memory management:
export class MemoryStore {
private agentId: string;
constructor(agentId: string) {
this.agentId = agentId;
}
/**
* Store a memory item
*/
async store(params: {
userId: string;
type: string;
key: string;
data: any;
rawData?: string;
metadata?: any;
}): Promise<string> {
const memory = {
agent_id: this.agentId,
user_id: params.userId,
type: params.type,
key: params.key,
data: params.data,
raw_data: params.rawData,
metadata: params.metadata,
created_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
access_count: 0
};
// Implementation to store in database
return 'memory_id';
}
/**
* Retrieve a memory item by key
*/
async retrieve(key: string): Promise<any> {
// Implementation to retrieve from database
return null;
}
/**
* Retrieve memories by type
*/
async retrieveByType(type: string, limit: number = 10): Promise<any[]> {
// Implementation to retrieve from database
return [];
}
/**
* Update a memory item
*/
async update(key: string, data: any, rawData?: string): Promise<boolean> {
// Implementation to update in database
return true;
}
}Utility Functions
Command Utilities
/**
* Generates a unique command ID
*/
export function generateCommandId(): string {
return `cmd_${Date.now()}_${Math.random().toString(36).substring(2, 9)}`;
}
/**
* Calculates duration between start time and now
*/
export function calculateDuration(startTime: string): number {
const start = new Date(startTime).getTime();
const end = new Date().getTime();
return end - start;
}
/**
* Formats a command for client-side display
*/
export function formatCommandForDisplay(command: DbCommand): any {
return {
id: command.id,
task: command.task,
status: command.status,
description: command.description,
results: command.results,
created: command.created_at,
updated: command.updated_at,
duration: command.duration ? `${(command.duration / 1000).toFixed(2)}s` : null,
priority: command.priority,
executionOrder: command.execution_order
};
}
/**
* Sort commands by priority
*/
export function sortCommandsByPriority(commands: DbCommand[]): DbCommand[] {
return [...commands].sort((a, b) => (b.priority || 5) - (a.priority || 5));
}
/**
* Generate next step in execution order
*/
export function getNextExecutionStep(command: DbCommand): string | null {
if (!command.execution_order || command.execution_order.length === 0) {
return null;
}
// Find current step based on tool/target status
const executedTools = (command.tools || []).filter(tool =>
tool.status === 'completed' || tool.status === 'failed'
).map(tool => tool.name);
// Find first step in execution order that hasn't been executed
return command.execution_order.find(step => !executedTools.includes(step)) || null;
}Tool Utilities
/**
* Create a tool definition
*/
export function createTool(params: {
name: string;
description: string;
type: 'synchronous' | 'asynchronous';
parameters?: Record<string, any>;
executionIndex?: number;
supervisionRequired?: boolean;
}): any {
return {
name: params.name,
description: params.description,
status: 'not_initialized',
type: params.type,
parameters: params.parameters || {},
execution_index: params.executionIndex,
supervision_required: params.supervisionRequired || false
};
}
/**
* Common tools for reuse
*/
export const commonTools = {
dataFetch: (params?: Record<string, any>) => createTool({
name: 'data_fetch',
description: 'Fetch data from external sources',
type: 'synchronous',
parameters: params
}),
search: (params?: Record<string, any>) => createTool({
name: 'search',
description: 'Search knowledge base for relevant information',
type: 'synchronous',
parameters: params
})
};Supervision Utilities
/**
* Types for supervision
*/
export type SupervisionStatus = 'pending' | 'approved' | 'rejected' | 'modified';
export interface SupervisionRequest {
commandId: string;
results: any[];
context: string;
requestedBy: string;
priority: number;
}
export interface SupervisionResponse {
requestId: string;
status: SupervisionStatus;
commandId: string;
reviewedAt?: string;
reviewedBy?: string;
comments?: string;
modifications?: any[];
}
export interface SupervisionDecision {
status: 'approved' | 'rejected' | 'modified';
comments?: string;
modifications?: any[];
}
export interface SupervisionParams {
autoApproveThreshold?: number;
requireApprovalFor?: string[];
supervisorRoles: string[];
timeoutSeconds?: number;
escalationPath?: string[];
}
/**
* Create supervision parameters
*/
export function createSupervisionParams(params: Partial<SupervisionParams>): SupervisionParams {
return {
autoApproveThreshold: params.autoApproveThreshold ?? 0.8,
requireApprovalFor: params.requireApprovalFor || [],
supervisorRoles: params.supervisorRoles || ['supervisor'],
timeoutSeconds: params.timeoutSeconds ?? 300,
escalationPath: params.escalationPath || []
};
}
/**
* Check if result requires supervision
*/
export function requiresSupervision(
result: any,
supervisionParams: SupervisionParams
): boolean {
// Check if this type of result requires supervision
if (supervisionParams.requireApprovalFor?.includes(result.type)) {
return true;
}
// Additional logic to determine if supervision is needed
// based on confidence scores, content, etc.
return false;
}
/**
* Create supervision request
*/
export function createSupervisionRequest(
commandId: string,
results: any[],
context: string,
agentId: string,
priority: number = 5
): SupervisionRequest {
return {
commandId,
results,
context,
requestedBy: agentId,
priority
};
}Agent Types
Specialist Agent Example
/**
* Example specialized agent implementation
*/
export class DataProcessingAgent extends BaseAgent {
constructor(id: string, name: string) {
super(id, name, ['data_processing', 'data_analysis', 'data_transformation']);
}
async executeCommand(command: DbCommand): Promise<CommandExecutionResult> {
try {
// Check for custom execution order
if (command.execution_order && command.execution_order.length > 0) {
return await this.executeWithCustomOrder(command);
}
// Execute appropriate action based on task
switch (command.task) {
case 'process_data':
return await this.handleDataProcessing(command);
case 'analyze_data':
return await this.handleDataAnalysis(command);
default:
throw new Error(`Unsupported task: ${command.task}`);
}
} catch (error: any) {
return {
status: 'failed',
error: error.message
};
}
}
private async executeWithCustomOrder(command: DbCommand): Promise<CommandExecutionResult> {
const results: any[] = [];
// Execute tools in the specified order
for (const stepName of command.execution_order || []) {
// Find the tool
const tool = (command.tools || []).find(t => t.name === stepName);
if (tool) {
try {
tool.status = 'running';
const result = await this.executeTool(tool);
tool.status = 'completed';
results.push({
tool: tool.name,
status: 'completed',
result
});
} catch (error) {
tool.status = 'failed';
results.push({
tool: tool.name,
status: 'failed',
error: error.message
});
}
}
}
// Process targets with accumulated results
const targetResults = await this.processTargets(command.targets || [], results);
// Check if supervision is required
if (command.supervision_params &&
targetResults.some(r => requiresSupervision(r, command.supervision_params))) {
// Request supervision
const supervisionRequest = createSupervisionRequest(
command.id,
targetResults,
command.context || '',
this.id,
command.priority
);
const supervisionResult = await this.requestSupervision(supervisionRequest);
if (supervisionResult.status === 'approved') {
return {
status: 'completed',
results: targetResults
};
} else if (supervisionResult.status === 'modified') {
return {
status: 'completed',
results: supervisionResult.modifications || targetResults
};
} else {
return {
status: 'failed',
error: 'Supervision rejected the results',
results: targetResults
};
}
}
return {
status: 'completed',
results: targetResults
};
}
private async handleDataProcessing(command: DbCommand): Promise<CommandExecutionResult> {
// Implementation of data processing
return {
status: 'completed',
results: [{
type: 'processed_data',
data: { processed: true }
}]
};
}
private async handleDataAnalysis(command: DbCommand): Promise<CommandExecutionResult> {
// Implementation of data analysis
return {
status: 'completed',
results: [{
type: 'analysis_result',
analysis: {
insights: [],
summary: "Analysis complete"
}
}]
};
}
async executeTool(tool: any): Promise<any> {
// Tool execution implementation
switch (tool.name) {
case 'data_transformation':
return this.transformData(tool.parameters);
case 'data_validation':
return this.validateData(tool.parameters);
default:
throw new Error(`Unsupported tool: ${tool.name}`);
}
}
private async transformData(params: any): Promise<any> {
// Data transformation implementation
return { transformed: true, data: params };
}
private async validateData(params: any): Promise<any> {
// Data validation implementation
return { valid: true, issues: [] };
}
private async requestSupervision(request: SupervisionRequest): Promise<SupervisionResponse> {
// Implementation to request supervision
// This could be a call to the CommandService
return {
requestId: generateCommandId(),
status: 'approved',
commandId: request.commandId
};
}
}Supervisor Agent
/**
* Example supervisor agent implementation
*/
export class SupervisorAgent extends BaseAgent {
constructor(id: string, name: string) {
super(id, name, ['supervision', 'quality_control']);
}
async executeCommand(command: DbCommand): Promise<CommandExecutionResult> {
try {
// Execute appropriate action based on task
switch (command.task) {
case 'review_results':
return await this.reviewResults(command);
case 'quality_control':
return await this.performQualityControl(command);
default:
throw new Error(`Unsupported task: ${command.task}`);
}
} catch (error: any) {
return {
status: 'failed',
error: error.message
};
}
}
private async reviewResults(command: DbCommand): Promise<CommandExecutionResult> {
// Extract supervision request from command context
const supervisionRequest = JSON.parse(command.context || '{}') as SupervisionRequest;
// Review the results
const decision = await this.evaluateResults(supervisionRequest.results);
// Submit the supervision decision
const commandService = new CommandService();
await commandService.submitSupervisionDecision(supervisionRequest.requestId, decision);
return {
status: 'completed',
results: [{
type: 'supervision_decision',
decision
}]
};
}
private async performQualityControl(command: DbCommand): Promise<CommandExecutionResult> {
// Implementation of quality control
return {
status: 'completed',
results: [{
type: 'quality_report',
report: { quality: 'high', issues: [] }
}]
};
}
private async evaluateResults(results: any[]): Promise<SupervisionDecision> {
// Implementation to evaluate results and make a decision
return {
status: 'approved',
comments: 'Results meet quality standards'
};
}
}Usage Examples
Basic Command Execution with Custom Ordering
// Initialize services
const commandService = new CommandService();
const processingAgent = new DataProcessingAgent('agent_123', 'Data Processor');
// Create a command with execution order
const command = CommandFactory.createCommand({
task: 'process_data',
userId: 'user_456',
description: 'Process raw data file and transform to JSON',
context: 'Previous processing context...',
tools: [
createTool({
name: 'data_validation',
description: 'Validate input data',
type: 'synchronous',
executionIndex: 1
}),
createTool({
name: 'data_transformation',
description: 'Transform data format',
type: 'synchronous',
executionIndex: 2
}),
commonTools.dataFetch({
source: 'database',
table: 'raw_data',
executionIndex: 0
})
],
agentId: processingAgent.id,
executionOrder: ['data_fetch', 'data_validation', 'data_transformation'],
priority: 8
});
// Submit command
const commandId = await commandService.submitCommand(command);
// Track status changes
commandService.on('statusChange', (update) => {
console.log(`Command ${update.id} status: ${update.status}`);
});
// Execute command
await processingAgent.executeCommand(await getCommandById(commandId));Multi-Agent Workflow with Supervision
// Coordinator pattern for multi-agent workflows with supervision
async function processDataPipelineWithSupervision(userId: string, dataId: string) {
// Create initial command
const command = CommandFactory.createCommand({
task: 'data_pipeline',
userId,
description: 'Process data through extraction, transformation, and analysis',
context: `Data ID: ${dataId}`,
priority: 7
});
// Submit to coordinator
const commandId = await commandService.submitCommand(command);
// Determine appropriate agents for each stage
const extractionAgent = new DataExtractionAgent('agent_extract', 'Data Extractor');
const transformationAgent = new DataTransformationAgent('agent_transform', 'Data Transformer');
const analysisAgent = new DataAnalysisAgent('agent_analysis', 'Data Analyzer');
const supervisorAgent = new SupervisorAgent('agent_supervisor', 'Data Quality Supervisor');
// Define supervision parameters
const supervisionParams = createSupervisionParams({
autoApproveThreshold: 0.9,
requireApprovalFor: ['analysis_result'],
supervisorRoles: ['data_quality_supervisor'],
timeoutSeconds: 120
});
// Execute extraction
const extractionCommand = CommandFactory.createCommand({
task: 'extract_data',
userId,
description: 'Extract data from source',
context: `Data ID: ${dataId}`,
agentId: extractionAgent.id,
executionOrder: ['fetch_source', 'extract_content', 'validate_extract']
});
const extractionCommandId = await commandService.submitCommand(extractionCommand);
const extractionResult = await extractionAgent.executeCommand(await getCommandById(extractionCommandId));
// Execute transformation with extraction results as context
const transformationCommand = CommandFactory.createCommand({
task: 'transform_data',
userId,
description: 'Transform extracted data',
context: JSON.stringify(extractionResult.results),
agentId: transformationAgent.id,
executionOrder: ['validate_input', 'transform_format', 'enrich_data']
});
const transformationCommandId = await commandService.submitCommand(transformationCommand);
const transformationResult = await transformationAgent.executeCommand(await getCommandById(transformationCommandId));
// Execute analysis with transformed data and supervision
const analysisCommand = CommandFactory.createCommand({
task: 'analyze_data',
userId,
description: 'Analyze transformed data',
context: JSON.stringify(transformationResult.results),
agentId: analysisAgent.id,
executionOrder: ['prepare_data', 'perform_analysis', 'generate_insights'],
supervisionParams
});
const analysisCommandId = await commandService.submitCommand(analysisCommand);
const analysisResult = await analysisAgent.executeCommand(await getCommandById(analysisCommandId));
// If supervision requested, create a supervision command
if (analysisResult.supervisionRequestId) {
const supervisionCommand = CommandFactory.createCommand({
task: 'review_results',
userId,
description: 'Review analysis results',
context: JSON.stringify({
requestId: analysisResult.supervisionRequestId,
results: analysisResult.results,
commandId: analysisCommandId
}),
agentId: supervisorAgent.id,
priority: 9 // High priority for supervision
});
const supervisionCommandId = await commandService.submitCommand(supervisionCommand);
await supervisorAgent.executeCommand(await getCommandById(supervisionCommandId));
}
}Integration with External APIs
/**
* Extend BaseAgent to integrate with external APIs
*/
export class APIIntegrationAgent extends BaseAgent {
private apiClient: any;
constructor(id: string, name: string, apiClient: any) {
super(id, name, ['api_integration', 'data_transformation']);
this.apiClient = apiClient;
}
async executeCommand(command: DbCommand): Promise<CommandExecutionResult> {
try {
// Check if custom execution order is specified
if (command.execution_order && command.execution_order.length > 0) {
// Use custom execution order
const toolOrder = command.execution_order;
const orderedTools = (command.tools || [])
.sort((a, b) => {
const aIndex = toolOrder.indexOf(a.name);
const bIndex = toolOrder.indexOf(b.name);
return aIndex - bIndex;
});
// Execute tools in specified order
const toolResults = [];
for (const tool of orderedTools) {
try {
tool.status = 'running';
const result = await this.executeTool(tool);
tool.status = 'completed';
toolResults.push({
tool: tool.name,
status: 'completed',
result
});
} catch (error) {
tool.status = 'failed';
toolResults.push({
tool: tool.name,
status: 'failed',
error: error.message
});
}
}
// Process API request using results from ordered tools
const apiResponse = await this.makeAPIRequest(command, toolResults);
// Transform response based on targets
const transformedResults = await this.transformResults(apiResponse, command.targets || []);
// Check if supervision is required
if (command.supervision_params) {
const needsSupervision = transformedResults.some(
result => requiresSupervision(result, command.supervision_params)
);
if (needsSupervision) {
const supervisionRequest = createSupervisionRequest(
command.id,
transformedResults,
command.context || '',
this.id,
command.priority || 5
);
// Request supervision
const commandService = new CommandService();
const supervisionResponse = await commandService.requestSupervision(
command.id,
supervisionRequest
);
// Return with pending status until supervision completes
return {
status: 'pending_supervision',
results: transformedResults,
supervisionRequestId: supervisionResponse.requestId
};
}
}
return {
status: 'completed',
results: transformedResults
};
}
// Default execution flow without custom ordering
const toolResults = await this.executeTools(command.tools || []);
const apiResponse = await this.makeAPIRequest(command, toolResults);
const transformedResults = await this.transformResults(apiResponse, command.targets || []);
return {
status: 'completed',
results: transformedResults
};
} catch (error: any) {
return {
status: 'failed',
error: error.message
};
}
}
private async makeAPIRequest(command: DbCommand, toolResults: any[]): Promise<any> {
// Extract parameters from tool results
const params = this.extractAPIParameters(toolResults);
// Call appropriate API endpoint based on command task
switch (command.task) {
case 'fetch_data':
return this.apiClient.getData(params);
case 'create_resource':
return this.apiClient.createResource(params);
case 'update_resource':
return this.apiClient.updateResource(params);
default:
throw new Error(`Unsupported API task: ${command.task}`);
}
}
private extractAPIParameters(toolResults: any[]): Record<string, any> {
// Extract and combine parameters from tool results
const params: Record<string, any> = {};
for (const result of toolResults) {
if (result.status === 'completed' && result.result) {
Object.assign(params, result.result);
}
}
return params;
}
private async transformResults(apiResponse: any, targets: any[]): Promise<any[]> {
// Transform API response according to target specifications
return targets.map(target => {
const targetType = Object.keys(target)[0];
switch (targetType) {
case 'json':
return {
type: 'json',
content: this.transformToJSON(apiResponse, target.json)
};
case 'structured_data':
return {
type: 'structured_data',
content: this.transformToStructured(apiResponse, target.structured_data)
};
default:
return {
type: targetType,
content: apiResponse
};
}
});
}
private transformToJSON(data: any, spec: any): any {
// Transform data to JSON format based on spec
return data;
}
private transformToStructured(data: any, spec: any): any {
// Transform data to structured format based on spec
return {
formatted: true,
data: data
};
}
}Error Handling Strategies
/**
* Retry mechanism for command execution
*/
export async function executeWithRetry(
agent: BaseAgent,
command: DbCommand,
maxRetries: number = 3,
backoffMs: number = 1000
): Promise<CommandExecutionResult> {
let lastError: Error | null = null;
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await agent.executeCommand(command);
} catch (error: any) {
lastError = error;
console.error(`Attempt ${attempt} failed: ${error.message}`);
if (attempt < maxRetries) {
// Wait with exponential backoff
const delay = backoffMs * Math.pow(2, attempt - 1);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
return {
status: 'failed',
error: `Failed after ${maxRetries} attempts: ${lastError?.message}`
};
}
/**
* Circuit breaker for external services
*/
export class CircuitBreaker {
private failureCount: number = 0;
private lastFailureTime: number = 0;
private state: 'closed' | 'open' | 'half-open' = 'closed';
constructor(
private threshold: number = 5,
private timeout: number = 30000
) {}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === 'open') {
// Check if timeout has elapsed
const now = Date.now();
if (now - this.lastFailureTime > this.timeout) {
this.state = 'half-open';
} else {
throw new Error('Circuit breaker is open');
}
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess(): void {
this.failureCount = 0;
this.state = 'closed';
}
private onFailure(): void {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.state === 'half-open' || this.failureCount >= this.threshold) {
this.state = 'open';
}
}
}Best Practices
-
Command Design
- Keep commands focused on specific tasks
- Include all necessary context in the command
- Use the
CommandFactoryto ensure consistent command structure - Specify execution order for complex workflows
- Set appropriate priority levels for time-sensitive tasks
-
Agent Implementation
- Extend
BaseAgentfor all agent implementations - Clearly define agent capabilities
- Implement appropriate error handling
- Use memory store for persistent data across commands
- Support both default and custom execution ordering
- Extend
-
Tool Management
- Use common tools when possible for consistency
- Track tool execution status
- Validate tool parameters before execution
- Specify execution prerequisites for dependent tools
-
Supervision Implementation
- Define clear supervision parameters
- Implement appropriate supervision escalation paths
- Set timeouts for supervision requests
- Create specialized supervisor agents for quality control
-
API Integration
- Use the
APIIntegrationAgentpattern for external services - Implement circuit breakers for unstable services
- Handle rate limiting and retries
- Support transaction rollback for multi-step API operations
- Use the
-
Performance Optimization
- Implement command caching for frequent requests
- Use batch processing for related commands
- Implement command prioritization based on urgency
Agent Communication with Portkey
The Agentbase library provides a standardized way to call agents using Portkey, a unified interface for accessing different LLM providers. This allows easy parameterization of model types, providers, and output formats while providing a consistent experience across your application.
PortkeyAgentConnector
/**
* Options for configuring model calls via Portkey
*/
export interface PortkeyModelOptions {
modelType: 'anthropic' | 'openai' | 'gemini';
modelId?: string;
maxTokens?: number;
temperature?: number;
topP?: number;
responseFormat?: 'json' | 'text';
systemPrompt?: string;
}
/**
* Configuration for Portkey API access
*/
export interface PortkeyConfig {
apiKey: string;
virtualKeys: Record<string, string>;
baseURL?: string;
}
/**
* Agent connector using Portkey for LLM access
*/
export class PortkeyAgentConnector {
private portkeyConfig: PortkeyConfig;
private defaultOptions: Partial<PortkeyModelOptions>;
constructor(config: PortkeyConfig, defaultOptions?: Partial<PortkeyModelOptions>) {
this.portkeyConfig = config;
this.defaultOptions = defaultOptions || {
modelType: 'anthropic',
maxTokens: 4096,
temperature: 0.7,
responseFormat: 'text'
};
}
/**
* Call an agent with messages using Portkey
*/
async callAgent(
messages: Array<{
role: 'system' | 'user' | 'assistant';
content: string | any;
}>,
options?: Partial<PortkeyModelOptions>
): Promise<any> {
try {
// Merge default options with provided options
const mergedOptions = { ...this.defaultOptions, ...options };
const { modelType, modelId, maxTokens, temperature, topP, responseFormat } = mergedOptions;
// Get virtual key for the selected provider
const virtualKey = this.portkeyConfig.virtualKeys[modelType] || '';
// Initialize Portkey
const Portkey = require('portkey-ai').default;
const portkey = new Portkey({
apiKey: this.portkeyConfig.apiKey,
virtualKey,
baseURL: this.portkeyConfig.baseURL || 'https://api.portkey.ai/v1'
});
// Determine model options based on provider
const modelOptions = this.getModelOptions(modelType, modelId, maxTokens);
// Add response format if specified
if (responseFormat === 'json') {
if (modelType === 'anthropic') {
modelOptions.response_format = { type: 'json' };
} else if (modelType === 'openai') {
modelOptions.response_format = { type: 'json_object' };
}
}
// Set temperature if provided
if (temperature !== undefined) {
modelOptions.temperature = temperature;
}
// Set top_p if provided
if (topP !== undefined) {
modelOptions.top_p = topP;
}
// Call the API
const response = await portkey.chat.completions.create({
messages,
...modelOptions
});
// Process and return the response
return this.processResponse(response, responseFormat);
} catch (error) {
console.error('[PortkeyAgentConnector] Error:', error);
throw error;
}
}
/**
* Get model options based on provider and model ID
*/
private getModelOptions(
modelType: 'anthropic' | 'openai' | 'gemini',
modelId?: string,
maxTokens?: number
): any {
switch (modelType) {
case 'anthropic':
return {
model: modelId || 'claude-3-5-sonnet-20240620',
max_tokens: maxTokens || 4096
};
case 'openai':
return {
model: modelId || 'gpt-5-nano',
max_tokens: maxTokens || 4096
};
case 'gemini':
return {
model: modelId || 'gemini-1.5-pro',
max_tokens: maxTokens || 4096
};
default:
return {
model: modelId || 'claude-3-5-sonnet-20240620',
max_tokens: maxTokens || 4096
};
}
}
/**
* Process the response based on format
*/
private processResponse(response: any, responseFormat?: 'json' | 'text'): any {
const content = response?.choices?.[0]?.message?.content || '';
if (responseFormat === 'json') {
try {
// Try to parse as JSON
return JSON.parse(content);
} catch (error) {
// If parsing fails, attempt to extract JSON from text
const jsonMatch = content.match(/```json\n([\s\S]*?)\n```/) ||
content.match(/\{[\s\S]*\}/);
if (jsonMatch) {
try {
return JSON.parse(jsonMatch[1] || jsonMatch[0]);
} catch {
// Return the raw content if JSON extraction fails
return content;
}
}
return content;
}
}
return content;
}
}Integration with BaseAgent
You can extend the BaseAgent class to integrate with the PortkeyAgentConnector:
/**
* Agent that uses Portkey for LLM communication
*/
export class PortkeyAgent extends BaseAgent {
private connector: PortkeyAgentConnector;
private defaultOptions: Partial<PortkeyModelOptions>;
constructor(
id: string,
name: string,
connector: PortkeyAgentConnector,
capabilities: string[] = [],
defaultOptions?: Partial<PortkeyModelOptions>
) {
super(id, name, capabilities);
this.connector = connector;
this.defaultOptions = defaultOptions || {};
}
/**
* Execute a command using Portkey for LLM responses
*/
async executeCommand(command: DbCommand): Promise<CommandExecutionResult> {
try {
// Prepare messages for the agent
const messages = this.prepareMessagesFromCommand(command);
// Determine model options
const modelOptions: PortkeyModelOptions = {
modelType: command.model_type || this.defaultOptions.modelType || 'anthropic',
modelId: command.model_id || this.defaultOptions.modelId,
maxTokens: command.max_tokens || this.defaultOptions.maxTokens,
temperature: command.temperature || this.defaultOptions.temperature,
responseFormat: command.response_format || this.defaultOptions.responseFormat || 'text'
};
// Call the agent through Portkey
const response = await this.connector.callAgent(messages, modelOptions);
// Process the response
const results = this.processAgentResponse(response, command);
return {
status: 'completed',
results
};
} catch (error: any) {
return {
status: 'failed',
error: error.message
};
}
}
/**
* Prepare messages for the agent from command
*/
private prepareMessagesFromCommand(command: DbCommand): any[] {
const messages = [];
// Add system message if specified
if (command.system_prompt) {
messages.push({
role: 'system',
content: command.system_prompt
});
}
// Add context as a user message if provided
if (command.context) {
messages.push({
role: 'user',
content: command.context
});
}
// Add the main task as a user message
messages.push({
role: 'user',
content: command.task
});
return messages;
}
/**
* Process the agent response
*/
private processAgentResponse(response: any, command: DbCommand): any[] {
// Handle different response formats
if (command.response_format === 'json' && typeof response === 'object') {
return [{ type: 'json_result', content: response }];
}
// For text responses
return [{ type: 'text_result', content: response }];
}
}Usage Example
// Initialize the Portkey connector
const portkeyConfig: PortkeyConfig = {
apiKey: process.env.PORTKEY_API_KEY || '',
virtualKeys: {
'anthropic': process.env.ANTHROPIC_API_KEY || '',
'openai': process.env.OPENAI_API_KEY || '',
'gemini': process.env.GEMINI_API_KEY || ''
},
baseURL: 'https://api.portkey.ai/v1'
};
// Create the connector with default options
const connector = new PortkeyAgentConnector(portkeyConfig, {
modelType: 'anthropic',
modelId: 'claude-3-5-sonnet-20240620',
temperature: 0.7,
responseFormat: 'json'
});
// Create an agent that uses the connector
const dataAnalysisAgent = new PortkeyAgent(
'agent_data_analysis',
'Data Analysis Agent',
connector,
['data_analysis', 'data_visualization'],
{ responseFormat: 'json' }
);
// Create a command to be executed
const command = CommandFactory.createCommand({
task: 'Analyze the sales data for Q2 2023 and identify key trends',
userId: 'user_123',
description: 'Quarterly sales analysis',
context: JSON.stringify({
salesData: [/* array of sales data */],
previousQuarterData: [/* array of previous quarter data */]
}),
model_type: 'anthropic',
model_id: 'claude-3-opus-20240229',
response_format: 'json'
});
// Execute the command
const commandService = new CommandService();
const commandId = await commandService.submitCommand(command);
const result = await dataAnalysisAgent.executeCommand(await getCommandById(commandId));
console.log('Analysis result:', result);Best Practices for Portkey Integration
-
Environment Variables
- Store API keys in environment variables
- Use a secure vault for production environments
- Implement key rotation mechanisms
-
Error Handling
- Implement robust error handling for API failures
- Add retry mechanisms with exponential backoff
- Log detailed error information for debugging
-
Response Processing
- Handle different response formats (JSON, text) appropriately
- Validate JSON responses to ensure they meet expected schema
- Implement fallback mechanisms for parsing failures
-
Performance Optimization
- Cache identical requests to reduce API calls
- Use streaming for long-running generations
- Implement request batching when possible
-
Model Selection
- Choose appropriate models based on task complexity
- Fallback to simpler models if premium models are unavailable
- Monitor token usage to optimize costs