Building Your First Operator
Step-by-step guide to creating and deploying an AI operator
Building Your First AI Operator
This guide will walk you through creating, configuring, and deploying your first AI operator using Optum Protocol. We'll build a simple warehouse automation operator that can identify objects, make decisions, and execute actions.
Prerequisites
Before starting, ensure you have:
- Optum Protocol CLI installed (
npm install -g optum-protocol
) - Docker for containerized deployment
- Access to a VLA model (we'll use RT-2 in this example)
- Hardware or simulation environment for testing
Step 1: Initialize Your Operator Project
# Create a new operator project
optum init warehouse-operator
# Navigate to project directory
cd warehouse-operator
# Explore the generated structure
ls -la
This creates a basic project structure:
warehouse-operator/
├── config/
│ ├── operator.yml # Main operator configuration
│ ├── model.yml # VLA model configuration
│ └── safety.yml # Safety rules and constraints
├── src/
│ ├── operator.py # Main operator implementation
│ ├── perception.py # Vision and sensor processing
│ ├── reasoning.py # Decision-making logic
│ └── actions.py # Action execution handlers
├── tests/
│ ├── test_operator.py # Unit tests
│ └── test_integration.py # Integration tests
├── deployment/
│ ├── Dockerfile # Container configuration
│ └── docker-compose.yml # Local deployment setup
└── requirements.txt # Python dependencies
Step 2: Configure Your Operator
Edit config/operator.yml
:
# config/operator.yml
operator:
name: warehouse-operator
version: "1.0.0"
description: "Autonomous warehouse item management operator"
# Specify the VLA model to use
model:
type: "vla"
name: "rt2-base"
version: "latest"
config_file: "config/model.yml"
# Define operator capabilities
capabilities:
- vision_processing
- object_manipulation
- inventory_management
- natural_language_interaction
# Resource requirements
resources:
cpu: "4 cores"
memory: "8Gi"
gpu: "1x NVIDIA RTX 4080"
storage: "50Gi"
# Safety configuration
safety:
config_file: "config/safety.yml"
emergency_stop_enabled: true
max_action_rate: 10 # actions per second
# MCP server connections
mcp_servers:
- name: "warehouse_system"
uri: "mcp://warehouse.local:8080"
tools:
- "move_item"
- "scan_barcode"
- "update_inventory"
- "get_item_location"
- name: "vision_system"
uri: "mcp://vision.local:8081"
resources:
- "camera_feeds"
- "object_detection_api"
Configure the VLA model in config/model.yml
:
# config/model.yml
model:
architecture: "rt2"
# Model-specific parameters
parameters:
vision_encoder: "vit-base"
language_model: "t5-base"
action_space_size: 256
max_sequence_length: 512
# Optimization settings
optimization:
precision: "fp16"
batch_size: 8
enable_tensorrt: true
enable_model_parallelism: false
# Input/output specifications
inputs:
vision:
format: "rgb"
resolution: [224, 224]
channels: 3
language:
tokenizer: "t5-tokenizer"
max_length: 128
outputs:
actions:
type: "discrete"
num_actions: 256
confidence:
type: "float"
range: [0.0, 1.0]
Set up safety rules in config/safety.yml
:
# config/safety.yml
safety_rules:
# Physical boundaries
physical_constraints:
workspace_bounds:
x: [-5.0, 5.0] # meters
y: [-5.0, 5.0] # meters
z: [0.0, 3.0] # meters
forbidden_zones:
- name: "human_workspace"
bounds:
x: [-1.0, 1.0]
y: [-1.0, 1.0]
z: [0.0, 2.0]
- name: "emergency_exit"
bounds:
x: [4.0, 5.0]
y: [-1.0, 1.0]
z: [0.0, 3.0]
# Behavioral constraints
behavioral_limits:
max_speed: 2.0 # m/s
max_acceleration: 1.0 # m/s²
max_force: 50.0 # Newtons
action_timeouts:
move_item: 30 # seconds
scan_barcode: 5 # seconds
update_inventory: 10 # seconds
# Monitoring thresholds
monitoring:
cpu_usage_threshold: 85 # percent
memory_usage_threshold: 90 # percent
error_rate_threshold: 5 # percent
safety_violations_max: 3 # before emergency stop
Step 3: Implement the Operator
Create the main operator class in src/operator.py
:
# src/operator.py
import asyncio
from typing import Dict, Any, Optional
from optum import BaseOperator, MCPClient, VLAModel
from .perception import PerceptionModule
from .reasoning import ReasoningModule
from .actions import ActionModule
class WarehouseOperator(BaseOperator):
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
# Initialize components
self.perception = PerceptionModule(config.get('perception', {}))
self.reasoning = ReasoningModule(config.get('reasoning', {}))
self.actions = ActionModule(config.get('actions', {}))
# Initialize VLA model
self.vla_model = VLAModel.load(
config['model']['name'],
config['model']['config_file']
)
# Initialize MCP client
self.mcp_client = MCPClient(config['mcp_servers'])
# Operator state
self.current_task = None
self.environment_state = {}
async def initialize(self):
"""Initialize the operator and its components."""
await self.perception.initialize()
await self.reasoning.initialize()
await self.actions.initialize()
await self.mcp_client.connect()
self.logger.info("Warehouse operator initialized successfully")
async def perceive(self) -> Dict[str, Any]:
"""Gather information from the environment."""
# Get visual input from cameras
visual_data = await self.perception.get_visual_data()
# Get sensor data
sensor_data = await self.perception.get_sensor_data()
# Get system status
system_status = await self.mcp_client.read_resource(
"warehouse_system://status"
)
# Combine all perceptual data
perception_data = {
'visual': visual_data,
'sensors': sensor_data,
'system': system_status,
'timestamp': self.get_current_timestamp()
}
return perception_data
async def reason(self, perception_data: Dict[str, Any]) -> Dict[str, Any]:
"""Process perceptual data and decide on actions."""
# Prepare input for VLA model
vla_input = self.prepare_vla_input(perception_data)
# Get action predictions from VLA model
vla_output = await self.vla_model.predict(vla_input)
# Process VLA output through reasoning module
reasoning_result = await self.reasoning.process(
perception_data,
vla_output
)
return reasoning_result
async def act(self, reasoning_result: Dict[str, Any]) -> Dict[str, Any]:
"""Execute planned actions."""
planned_actions = reasoning_result.get('planned_actions', [])
execution_results = []
for action in planned_actions:
try:
# Execute action through MCP
result = await self.execute_action(action)
execution_results.append({
'action': action,
'result': result,
'status': 'success'
})
except Exception as e:
self.logger.error(f"Action execution failed: {e}")
execution_results.append({
'action': action,
'error': str(e),
'status': 'failed'
})
# Handle failure recovery
await self.handle_action_failure(action, e)
return {
'executed_actions': execution_results,
'success_rate': self.calculate_success_rate(execution_results)
}
async def execute_action(self, action: Dict[str, Any]) -> Any:
"""Execute a specific action using MCP tools."""
action_type = action['type']
action_params = action['parameters']
if action_type == 'move_item':
return await self.mcp_client.call_tool(
'move_item',
{
'item_id': action_params['item_id'],
'source_location': action_params['source'],
'target_location': action_params['target']
}
)
elif action_type == 'scan_barcode':
return await self.mcp_client.call_tool(
'scan_barcode',
{
'location': action_params['location']
}
)
elif action_type == 'update_inventory':
return await self.mcp_client.call_tool(
'update_inventory',
{
'item_id': action_params['item_id'],
'quantity': action_params['quantity'],
'location': action_params['location']
}
)
else:
raise ValueError(f"Unknown action type: {action_type}")
def prepare_vla_input(self, perception_data: Dict[str, Any]) -> Dict[str, Any]:
"""Prepare input for the VLA model."""
# Extract visual features
visual_input = self.perception.extract_visual_features(
perception_data['visual']
)
# Prepare language context
language_input = self.prepare_language_context(perception_data)
return {
'vision': visual_input,
'language': language_input,
'context': {
'timestamp': perception_data['timestamp'],
'system_state': perception_data['system']
}
}
def prepare_language_context(self, perception_data: Dict[str, Any]) -> str:
"""Create language context for the VLA model."""
if self.current_task:
task_description = f"Current task: {self.current_task['description']}"
else:
task_description = "Waiting for task assignment"
system_status = perception_data['system']['status']
context = f"""
{task_description}
System Status: {system_status}
Available Actions: move_item, scan_barcode, update_inventory
Please analyze the current situation and decide on the best action to take.
"""
return context.strip()
async def handle_action_failure(self, action: Dict[str, Any], error: Exception):
"""Handle action execution failures."""
self.logger.warning(f"Action failed: {action}, Error: {error}")
# Implement recovery strategies
if "network" in str(error).lower():
# Network error - retry after delay
await asyncio.sleep(1.0)
elif "safety" in str(error).lower():
# Safety violation - emergency stop
await self.emergency_stop()
else:
# General error - log and continue
await self.log_error(action, error)
async def main_loop(self):
"""Main operator execution loop."""
while self.is_running:
try:
# Perceive-Reason-Act cycle
perception_data = await self.perceive()
reasoning_result = await self.reason(perception_data)
action_result = await self.act(reasoning_result)
# Update internal state
self.update_state(perception_data, reasoning_result, action_result)
# Safety check
await self.safety_check()
# Brief pause before next cycle
await asyncio.sleep(0.1) # 10 Hz operation
except Exception as e:
self.logger.error(f"Main loop error: {e}")
await self.handle_main_loop_error(e)
Step 4: Implement Perception Module
Create src/perception.py
:
# src/perception.py
import cv2
import numpy as np
from typing import Dict, Any, List
class PerceptionModule:
def __init__(self, config: Dict[str, Any]):
self.config = config
self.cameras = []
self.sensors = []
async def initialize(self):
"""Initialize cameras and sensors."""
# Initialize cameras
for camera_config in self.config.get('cameras', []):
camera = cv2.VideoCapture(camera_config['device_id'])
self.cameras.append({
'capture': camera,
'name': camera_config['name'],
'position': camera_config['position']
})
# Initialize sensors (simulated for this example)
self.sensors = self.config.get('sensors', [])
async def get_visual_data(self) -> Dict[str, Any]:
"""Capture visual data from all cameras."""
visual_data = {}
for camera in self.cameras:
ret, frame = camera['capture'].read()
if ret:
# Preprocess frame
processed_frame = self.preprocess_frame(frame)
visual_data[camera['name']] = {
'frame': processed_frame,
'timestamp': self.get_timestamp(),
'camera_position': camera['position']
}
return visual_data
def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
"""Preprocess camera frame for VLA model."""
# Resize to model input size
frame = cv2.resize(frame, (224, 224))
# Convert BGR to RGB
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Normalize pixel values
frame = frame.astype(np.float32) / 255.0
return frame
async def get_sensor_data(self) -> Dict[str, Any]:
"""Read data from all sensors."""
sensor_data = {}
for sensor in self.sensors:
if sensor['type'] == 'temperature':
# Simulate temperature reading
sensor_data['temperature'] = np.random.normal(22.0, 1.0)
elif sensor['type'] == 'humidity':
# Simulate humidity reading
sensor_data['humidity'] = np.random.normal(50.0, 5.0)
elif sensor['type'] == 'proximity':
# Simulate proximity sensor
sensor_data['proximity'] = np.random.uniform(0.1, 2.0)
return sensor_data
def extract_visual_features(self, visual_data: Dict[str, Any]) -> np.ndarray:
"""Extract features from visual data for VLA model input."""
# Combine frames from all cameras
frames = []
for camera_name, camera_data in visual_data.items():
frames.append(camera_data['frame'])
if frames:
# Stack frames (simple concatenation for this example)
combined_frame = np.concatenate(frames, axis=2)
return combined_frame
else:
# Return empty frame if no visual data
return np.zeros((224, 224, 3), dtype=np.float32)
Step 5: Deploy Your Operator
Build and deploy your operator:
# Build the operator container
optum build
# Test locally
optum test --local
# Deploy to development environment
optum deploy --env development
# Monitor deployment
optum status warehouse-operator
# View logs
optum logs warehouse-operator --follow
Step 6: Monitor and Iterate
Use the Optum Protocol dashboard to monitor your operator:
# Open monitoring dashboard
optum dashboard
# Get performance metrics
optum metrics warehouse-operator
# Debug specific issues
optum debug warehouse-operator --issue-id abc123
Next Steps
Now that you have a basic operator running, you can:
- Add more sophisticated reasoning by implementing custom logic in the reasoning module
- Integrate additional sensors and expand the perception capabilities
- Implement learning mechanisms to improve performance over time
- Add multi-operator coordination for fleet-based operations
- Enhance safety protocols with more sophisticated monitoring
Continue to the advanced guides to learn about these topics and more complex operator implementations.
Troubleshooting
Common issues and solutions:
Model Loading Issues:
# Check model availability
optum model list --type vla
# Download specific model
optum model download rt2-base
# Verify model configuration
optum model validate config/model.yml
MCP Connection Issues:
# Test MCP server connectivity
optum mcp test warehouse_system
# Debug MCP communication
optum mcp debug --server warehouse_system --verbose
Resource Allocation Issues:
# Check available resources
optum resources --node-info
# Adjust resource requirements
optum config edit config/operator.yml
Your first AI operator is now ready for action! The system will handle the complex orchestration while your operator focuses on the specific warehouse automation tasks you've defined.