Building Your First Operator

Step-by-step guide to creating and deploying an AI operator

Building Your First AI Operator

This guide will walk you through creating, configuring, and deploying your first AI operator using Optum Protocol. We'll build a simple warehouse automation operator that can identify objects, make decisions, and execute actions.

Prerequisites

Before starting, ensure you have:

  • Optum Protocol CLI installed (npm install -g optum-protocol)
  • Docker for containerized deployment
  • Access to a VLA model (we'll use RT-2 in this example)
  • Hardware or simulation environment for testing

Step 1: Initialize Your Operator Project

# Create a new operator project
optum init warehouse-operator

# Navigate to project directory
cd warehouse-operator

# Explore the generated structure
ls -la

This creates a basic project structure:

warehouse-operator/
├── config/
│   ├── operator.yml          # Main operator configuration
│   ├── model.yml            # VLA model configuration
│   └── safety.yml           # Safety rules and constraints
├── src/
│   ├── operator.py          # Main operator implementation
│   ├── perception.py        # Vision and sensor processing
│   ├── reasoning.py         # Decision-making logic
│   └── actions.py           # Action execution handlers
├── tests/
│   ├── test_operator.py     # Unit tests
│   └── test_integration.py  # Integration tests
├── deployment/
│   ├── Dockerfile           # Container configuration
│   └── docker-compose.yml   # Local deployment setup
└── requirements.txt         # Python dependencies

Step 2: Configure Your Operator

Edit config/operator.yml:

# config/operator.yml
operator:
  name: warehouse-operator
  version: "1.0.0"
  description: "Autonomous warehouse item management operator"
  
  # Specify the VLA model to use
  model:
    type: "vla"
    name: "rt2-base"
    version: "latest"
    config_file: "config/model.yml"
  
  # Define operator capabilities
  capabilities:
    - vision_processing
    - object_manipulation
    - inventory_management
    - natural_language_interaction
  
  # Resource requirements
  resources:
    cpu: "4 cores"
    memory: "8Gi"
    gpu: "1x NVIDIA RTX 4080"
    storage: "50Gi"
  
  # Safety configuration
  safety:
    config_file: "config/safety.yml"
    emergency_stop_enabled: true
    max_action_rate: 10  # actions per second
    
  # MCP server connections
  mcp_servers:
    - name: "warehouse_system"
      uri: "mcp://warehouse.local:8080"
      tools:
        - "move_item"
        - "scan_barcode"
        - "update_inventory"
        - "get_item_location"
    
    - name: "vision_system"
      uri: "mcp://vision.local:8081"
      resources:
        - "camera_feeds"
        - "object_detection_api"

Configure the VLA model in config/model.yml:

# config/model.yml
model:
  architecture: "rt2"
  
  # Model-specific parameters
  parameters:
    vision_encoder: "vit-base"
    language_model: "t5-base"
    action_space_size: 256
    max_sequence_length: 512
  
  # Optimization settings
  optimization:
    precision: "fp16"
    batch_size: 8
    enable_tensorrt: true
    enable_model_parallelism: false
  
  # Input/output specifications
  inputs:
    vision:
      format: "rgb"
      resolution: [224, 224]
      channels: 3
    language:
      tokenizer: "t5-tokenizer"
      max_length: 128
  
  outputs:
    actions:
      type: "discrete"
      num_actions: 256
    confidence:
      type: "float"
      range: [0.0, 1.0]

Set up safety rules in config/safety.yml:

# config/safety.yml
safety_rules:
  # Physical boundaries
  physical_constraints:
    workspace_bounds:
      x: [-5.0, 5.0]  # meters
      y: [-5.0, 5.0]  # meters
      z: [0.0, 3.0]   # meters
    
    forbidden_zones:
      - name: "human_workspace"
        bounds: 
          x: [-1.0, 1.0]
          y: [-1.0, 1.0]
          z: [0.0, 2.0]
      
      - name: "emergency_exit"
        bounds:
          x: [4.0, 5.0]
          y: [-1.0, 1.0]
          z: [0.0, 3.0]
  
  # Behavioral constraints
  behavioral_limits:
    max_speed: 2.0  # m/s
    max_acceleration: 1.0  # m/s²
    max_force: 50.0  # Newtons
    
    action_timeouts:
      move_item: 30  # seconds
      scan_barcode: 5  # seconds
      update_inventory: 10  # seconds
  
  # Monitoring thresholds
  monitoring:
    cpu_usage_threshold: 85  # percent
    memory_usage_threshold: 90  # percent
    error_rate_threshold: 5  # percent
    
    safety_violations_max: 3  # before emergency stop

Step 3: Implement the Operator

Create the main operator class in src/operator.py:

# src/operator.py
import asyncio
from typing import Dict, Any, Optional
from optum import BaseOperator, MCPClient, VLAModel
from .perception import PerceptionModule
from .reasoning import ReasoningModule
from .actions import ActionModule

class WarehouseOperator(BaseOperator):
    def __init__(self, config: Dict[str, Any]):
        super().__init__(config)
        
        # Initialize components
        self.perception = PerceptionModule(config.get('perception', {}))
        self.reasoning = ReasoningModule(config.get('reasoning', {}))
        self.actions = ActionModule(config.get('actions', {}))
        
        # Initialize VLA model
        self.vla_model = VLAModel.load(
            config['model']['name'],
            config['model']['config_file']
        )
        
        # Initialize MCP client
        self.mcp_client = MCPClient(config['mcp_servers'])
        
        # Operator state
        self.current_task = None
        self.environment_state = {}
        
    async def initialize(self):
        """Initialize the operator and its components."""
        await self.perception.initialize()
        await self.reasoning.initialize()
        await self.actions.initialize()
        await self.mcp_client.connect()
        
        self.logger.info("Warehouse operator initialized successfully")
    
    async def perceive(self) -> Dict[str, Any]:
        """Gather information from the environment."""
        # Get visual input from cameras
        visual_data = await self.perception.get_visual_data()
        
        # Get sensor data
        sensor_data = await self.perception.get_sensor_data()
        
        # Get system status
        system_status = await self.mcp_client.read_resource(
            "warehouse_system://status"
        )
        
        # Combine all perceptual data
        perception_data = {
            'visual': visual_data,
            'sensors': sensor_data,
            'system': system_status,
            'timestamp': self.get_current_timestamp()
        }
        
        return perception_data
    
    async def reason(self, perception_data: Dict[str, Any]) -> Dict[str, Any]:
        """Process perceptual data and decide on actions."""
        # Prepare input for VLA model
        vla_input = self.prepare_vla_input(perception_data)
        
        # Get action predictions from VLA model
        vla_output = await self.vla_model.predict(vla_input)
        
        # Process VLA output through reasoning module
        reasoning_result = await self.reasoning.process(
            perception_data,
            vla_output
        )
        
        return reasoning_result
    
    async def act(self, reasoning_result: Dict[str, Any]) -> Dict[str, Any]:
        """Execute planned actions."""
        planned_actions = reasoning_result.get('planned_actions', [])
        
        execution_results = []
        for action in planned_actions:
            try:
                # Execute action through MCP
                result = await self.execute_action(action)
                execution_results.append({
                    'action': action,
                    'result': result,
                    'status': 'success'
                })
                
            except Exception as e:
                self.logger.error(f"Action execution failed: {e}")
                execution_results.append({
                    'action': action,
                    'error': str(e),
                    'status': 'failed'
                })
                
                # Handle failure recovery
                await self.handle_action_failure(action, e)
        
        return {
            'executed_actions': execution_results,
            'success_rate': self.calculate_success_rate(execution_results)
        }
    
    async def execute_action(self, action: Dict[str, Any]) -> Any:
        """Execute a specific action using MCP tools."""
        action_type = action['type']
        action_params = action['parameters']
        
        if action_type == 'move_item':
            return await self.mcp_client.call_tool(
                'move_item',
                {
                    'item_id': action_params['item_id'],
                    'source_location': action_params['source'],
                    'target_location': action_params['target']
                }
            )
        
        elif action_type == 'scan_barcode':
            return await self.mcp_client.call_tool(
                'scan_barcode',
                {
                    'location': action_params['location']
                }
            )
        
        elif action_type == 'update_inventory':
            return await self.mcp_client.call_tool(
                'update_inventory',
                {
                    'item_id': action_params['item_id'],
                    'quantity': action_params['quantity'],
                    'location': action_params['location']
                }
            )
        
        else:
            raise ValueError(f"Unknown action type: {action_type}")
    
    def prepare_vla_input(self, perception_data: Dict[str, Any]) -> Dict[str, Any]:
        """Prepare input for the VLA model."""
        # Extract visual features
        visual_input = self.perception.extract_visual_features(
            perception_data['visual']
        )
        
        # Prepare language context
        language_input = self.prepare_language_context(perception_data)
        
        return {
            'vision': visual_input,
            'language': language_input,
            'context': {
                'timestamp': perception_data['timestamp'],
                'system_state': perception_data['system']
            }
        }
    
    def prepare_language_context(self, perception_data: Dict[str, Any]) -> str:
        """Create language context for the VLA model."""
        if self.current_task:
            task_description = f"Current task: {self.current_task['description']}"
        else:
            task_description = "Waiting for task assignment"
        
        system_status = perception_data['system']['status']
        
        context = f"""
        {task_description}
        
        System Status: {system_status}
        Available Actions: move_item, scan_barcode, update_inventory
        
        Please analyze the current situation and decide on the best action to take.
        """
        
        return context.strip()
    
    async def handle_action_failure(self, action: Dict[str, Any], error: Exception):
        """Handle action execution failures."""
        self.logger.warning(f"Action failed: {action}, Error: {error}")
        
        # Implement recovery strategies
        if "network" in str(error).lower():
            # Network error - retry after delay
            await asyncio.sleep(1.0)
        
        elif "safety" in str(error).lower():
            # Safety violation - emergency stop
            await self.emergency_stop()
        
        else:
            # General error - log and continue
            await self.log_error(action, error)
    
    async def main_loop(self):
        """Main operator execution loop."""
        while self.is_running:
            try:
                # Perceive-Reason-Act cycle
                perception_data = await self.perceive()
                reasoning_result = await self.reason(perception_data)
                action_result = await self.act(reasoning_result)
                
                # Update internal state
                self.update_state(perception_data, reasoning_result, action_result)
                
                # Safety check
                await self.safety_check()
                
                # Brief pause before next cycle
                await asyncio.sleep(0.1)  # 10 Hz operation
                
            except Exception as e:
                self.logger.error(f"Main loop error: {e}")
                await self.handle_main_loop_error(e)

Step 4: Implement Perception Module

Create src/perception.py:

# src/perception.py
import cv2
import numpy as np
from typing import Dict, Any, List

class PerceptionModule:
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.cameras = []
        self.sensors = []
    
    async def initialize(self):
        """Initialize cameras and sensors."""
        # Initialize cameras
        for camera_config in self.config.get('cameras', []):
            camera = cv2.VideoCapture(camera_config['device_id'])
            self.cameras.append({
                'capture': camera,
                'name': camera_config['name'],
                'position': camera_config['position']
            })
        
        # Initialize sensors (simulated for this example)
        self.sensors = self.config.get('sensors', [])
    
    async def get_visual_data(self) -> Dict[str, Any]:
        """Capture visual data from all cameras."""
        visual_data = {}
        
        for camera in self.cameras:
            ret, frame = camera['capture'].read()
            if ret:
                # Preprocess frame
                processed_frame = self.preprocess_frame(frame)
                
                visual_data[camera['name']] = {
                    'frame': processed_frame,
                    'timestamp': self.get_timestamp(),
                    'camera_position': camera['position']
                }
        
        return visual_data
    
    def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
        """Preprocess camera frame for VLA model."""
        # Resize to model input size
        frame = cv2.resize(frame, (224, 224))
        
        # Convert BGR to RGB
        frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Normalize pixel values
        frame = frame.astype(np.float32) / 255.0
        
        return frame
    
    async def get_sensor_data(self) -> Dict[str, Any]:
        """Read data from all sensors."""
        sensor_data = {}
        
        for sensor in self.sensors:
            if sensor['type'] == 'temperature':
                # Simulate temperature reading
                sensor_data['temperature'] = np.random.normal(22.0, 1.0)
            
            elif sensor['type'] == 'humidity':
                # Simulate humidity reading
                sensor_data['humidity'] = np.random.normal(50.0, 5.0)
            
            elif sensor['type'] == 'proximity':
                # Simulate proximity sensor
                sensor_data['proximity'] = np.random.uniform(0.1, 2.0)
        
        return sensor_data
    
    def extract_visual_features(self, visual_data: Dict[str, Any]) -> np.ndarray:
        """Extract features from visual data for VLA model input."""
        # Combine frames from all cameras
        frames = []
        for camera_name, camera_data in visual_data.items():
            frames.append(camera_data['frame'])
        
        if frames:
            # Stack frames (simple concatenation for this example)
            combined_frame = np.concatenate(frames, axis=2)
            return combined_frame
        else:
            # Return empty frame if no visual data
            return np.zeros((224, 224, 3), dtype=np.float32)

Step 5: Deploy Your Operator

Build and deploy your operator:

# Build the operator container
optum build

# Test locally
optum test --local

# Deploy to development environment
optum deploy --env development

# Monitor deployment
optum status warehouse-operator

# View logs
optum logs warehouse-operator --follow

Step 6: Monitor and Iterate

Use the Optum Protocol dashboard to monitor your operator:

# Open monitoring dashboard
optum dashboard

# Get performance metrics
optum metrics warehouse-operator

# Debug specific issues
optum debug warehouse-operator --issue-id abc123

Next Steps

Now that you have a basic operator running, you can:

  1. Add more sophisticated reasoning by implementing custom logic in the reasoning module
  2. Integrate additional sensors and expand the perception capabilities
  3. Implement learning mechanisms to improve performance over time
  4. Add multi-operator coordination for fleet-based operations
  5. Enhance safety protocols with more sophisticated monitoring

Continue to the advanced guides to learn about these topics and more complex operator implementations.

Troubleshooting

Common issues and solutions:

Model Loading Issues:

# Check model availability
optum model list --type vla

# Download specific model
optum model download rt2-base

# Verify model configuration
optum model validate config/model.yml

MCP Connection Issues:

# Test MCP server connectivity
optum mcp test warehouse_system

# Debug MCP communication
optum mcp debug --server warehouse_system --verbose

Resource Allocation Issues:

# Check available resources
optum resources --node-info

# Adjust resource requirements
optum config edit config/operator.yml

Your first AI operator is now ready for action! The system will handle the complex orchestration while your operator focuses on the specific warehouse automation tasks you've defined.