Creating agentic AI to deploy ARO cluster using Terraform with Red Hat OpenShift AI on ARO and Azure OpenAI

Learn how to create an artificial intelligence (AI) that can deploy Microsoft Azure Red Hat® OpenShift® clusters using Terraform. Explore prompt-based infrastructure and how it can be used to maintain clusters in your own environment.

Learn how to create an artificial intelligence (AI) that can deploy Microsoft Azure Red Hat® OpenShift® clusters using Terraform. Explore prompt-based infrastructure and how it can be used to maintain clusters in your own environment.

Creating deployment agents for agentic AI using Terraform

5 mins

Now, you will create the Azure Red Hat® OpenShift® deployment agent which essentially is the wrapper that dynamically generates Terraform configurations for Azure Red Hat OpenShift deployment. It also manages the complete lifecycle including state management, the Azure authentication, and resource provisioning. In addition to this work, we’ll tackle the orchestration layer of the agent. 

What will you learn?

  • How to create deployment agents for Azure Red Hat OpenShift
  • How to create a simulator for the agent

What do you need before starting?

  • Azure Red Hat OpenShift cluster
  • Azure OpenAI model access
  • Setup file created in Azure
  • OpenAI parser created

Creating the deployment agent

To begin with the deployment agent side of things, you should copy the lines below and save it as deployment.py.

import os
import tempfile
import json
import subprocess
from pathlib import Path
class ARODeploymentAgent:
    def __init__(self, debug=False):
        self.debug = debug
        self.work_dir = Path(tempfile.mkdtemp())
        
        required_env_vars = ["AZURE_SUBSCRIPTION_ID", "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CLIENT_SECRET"]
        missing_vars = [var for var in required_env_vars if not os.getenv(var)]
        if missing_vars:
            raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
        
        self._login_to_azure()        
        self._create_terraform_files()
    def _run_command(self, command, cwd=None, stream_output=False):
        try:
            if stream_output:  
                process = subprocess.Popen(
                    command,
                    shell=True,
                    text=True,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.STDOUT,
                    cwd=cwd or self.work_dir,
                    env=os.environ.copy(),
                    bufsize=1  
                )
                output = ""
                for line in iter(process.stdout.readline, ''):
                    if line:
                        print(line.rstrip())
                        output += line
                process.wait()
                return process.returncode == 0, output
            else:
                result = subprocess.run(
                    command,
                    shell=True,
                    text=True,
                    capture_output=True,
                    cwd=cwd or self.work_dir,
                    env=os.environ.copy()
                )
                return result.returncode == 0, result.stdout if result.returncode == 0 else result.stderr
        except Exception as e:
            return False, str(e)
    
    def _login_to_azure(self):
        cmd = f"az login --service-principal --username \"{os.getenv('AZURE_CLIENT_ID')}\" --password \"{os.getenv('AZURE_CLIENT_SECRET')}\" --tenant \"{os.getenv('AZURE_TENANT_ID')}\""
        self._run_command(cmd)
    
    def _create_terraform_files(self):
        variables_tf = '''variable "cluster_name" {
  type        = string
  description = "ARO cluster name"
}
variable "location" {
  type        = string
  description = "Azure region"
}
variable "aro_version" {
  type        = string
  description = "ARO version"
}
variable "worker_node_count" {
  type        = number
  default     = 3
  description = "Number of worker nodes"
}
variable "worker_vm_size" {
  type        = string
  default     = "Standard_D4s_v3"
  description = "VM size for worker nodes"
}
variable "subscription_id" {
  type        = string
  description = "Azure Subscription ID"
}
variable "api_server_profile" {
  type        = string
  default     = "Public"
  description = "API Server visibility (Public or Private)"
}
variable "ingress_profile" {
  type        = string
  default     = "Public"
  description = "Ingress visibility (Public or Private)"
}'''
        
        main_tf = f'''locals {{
  resource_group_name = "${{var.cluster_name}}-rg"
}}
resource "azurerm_resource_group" "aro" {{
  name     = local.resource_group_name
  location = var.location
}}
resource "azurerm_virtual_network" "aro" {{
  name                = "${{var.cluster_name}}-vnet"
  location            = azurerm_resource_group.aro.location
  resource_group_name = azurerm_resource_group.aro.name
  address_space       = ["10.0.0.0/16"]
}}
resource "azurerm_subnet" "master_subnet" {{
  name                 = "master-subnet"
  resource_group_name  = azurerm_resource_group.aro.name
  virtual_network_name = azurerm_virtual_network.aro.name
  address_prefixes     = ["10.0.0.0/23"]
  service_endpoints    = ["Microsoft.ContainerRegistry"]
  private_endpoint_network_policies_enabled = false
  private_link_service_network_policies_enabled = false
}}
resource "azurerm_subnet" "worker_subnet" {{
  name                 = "worker-subnet"
  resource_group_name  = azurerm_resource_group.aro.name
  virtual_network_name = azurerm_virtual_network.aro.name
  address_prefixes     = ["10.0.2.0/23"]
  service_endpoints    = ["Microsoft.ContainerRegistry"]
  private_endpoint_network_policies_enabled = false
  private_link_service_network_policies_enabled = false
}}
resource "azurerm_redhat_openshift_cluster" "aro" {{
  name                = var.cluster_name
  location            = azurerm_resource_group.aro.location
  resource_group_name = azurerm_resource_group.aro.name
  cluster_profile {{
    domain                     = var.cluster_name
    version                    = var.aro_version
    managed_resource_group_name = "${{azurerm_resource_group.aro.name}}-managed"
  }}
  network_profile {{
    pod_cidr     = "10.128.0.0/14"
    service_cidr = "172.30.0.0/16"
  }}
  main_profile {{
    vm_size   = "Standard_D8s_v3"
    subnet_id = azurerm_subnet.master_subnet.id
  }}
  worker_profile {{
    subnet_id    = azurerm_subnet.worker_subnet.id
    disk_size_gb = 128
    node_count   = var.worker_node_count
    vm_size      = var.worker_vm_size
  }}
  service_principal {{
    client_id     = "{os.getenv("AZURE_CLIENT_ID")}"
    client_secret = "{os.getenv("AZURE_CLIENT_SECRET")}"
  }}
  api_server_profile {{
    visibility = var.api_server_profile
  }}
  ingress_profile {{
    visibility = var.ingress_profile
  }}
}}
output "console_url" {{
  value = azurerm_redhat_openshift_cluster.aro.console_url
}}
output "api_url" {{
  value = azurerm_redhat_openshift_cluster.aro.api_server_profile[0].url
}}'''
        
        provider_tf = f'''terraform {{
  required_providers {{
    azurerm = {{
      source  = "hashicorp/azurerm"
      version = "~>3.0"
    }}
  }}
}}
provider "azurerm" {{
  features {{}}
  subscription_id = var.subscription_id
  tenant_id       = "{os.getenv("AZURE_TENANT_ID")}"
  client_id       = "{os.getenv("AZURE_CLIENT_ID")}"
  client_secret   = "{os.getenv("AZURE_CLIENT_SECRET")}"
}}'''
        
        (self.work_dir / "variables.tf").write_text(variables_tf)
        (self.work_dir / "main.tf").write_text(main_tf)
        (self.work_dir / "provider.tf").write_text(provider_tf)
            
    def create_terraform_vars(self, params):
        cluster_name = params.get("name", "agentic-aro")
        region = params.get("region", "westus")
        worker_vm_size = params.get("worker_vm_size", "Standard_D4s_v3")
        worker_node_count = params.get("worker_node_count", 3)
        aro_version = params.get("version") or self.get_latest_aro_version(region)
        is_private = params.get("is_private", False)
        
        tfvars_content = f'''subscription_id = "{os.getenv('AZURE_SUBSCRIPTION_ID')}"
cluster_name = "{cluster_name}"
location = "{region}"
aro_version = "{aro_version}"
worker_node_count = {worker_node_count}
worker_vm_size = "{worker_vm_size}"
api_server_profile = "{'Private' if is_private else 'Public'}"
ingress_profile = "{'Private' if is_private else 'Public'}"'''
        
        (self.work_dir / "terraform.tfvars").write_text(tfvars_content)
        return str(self.work_dir)
                    
    def get_latest_aro_version(self, region="westus"):
        cmd = f"az aro get-versions -l {region} --output tsv"
        success, output = self._run_command(cmd)
        
        if success and output.strip():
            versions = [v.strip() for v in output.strip().split('\n') if v.strip()]
            if versions:
                stable_versions = [v for v in versions if not any(x in v for x in ['-tech-preview', '-dev', 'nightly'])]
                if stable_versions:
                    stable_versions.sort(reverse=True)
                    return stable_versions[0]
        
        return "4.16.30"
    def deploy_cluster(self, params):
        print(f"Processing deployment request for cluster: {params.get('name', 'agentic-aro')}")
        
        rg_name = f"{params.get('name', 'agentic-aro')}-rg"
        cmd = f"az group exists --name {rg_name} --output tsv"
        success, output = self._run_command(cmd)
        
        if success and output.strip().lower() == 'true':
            return {"status": "error", "message": f"Resource group {rg_name} already exists"}
        
        try:
            work_dir = self.create_terraform_vars(params)
    
            print("Running terraform init...")
            success, output = self._run_command("terraform init", cwd=work_dir, stream_output=True)
            if not success:
                return {"status": "error", "message": f"Terraform init failed: {output}"}
    
            print("\nRunning terraform plan...")
            success, output = self._run_command("terraform plan -out=aro.plan", cwd=work_dir, stream_output=True)
            if not success:
                return {"status": "error", "message": f"Terraform plan failed: {output}"}
    
            print("\nRunning terraform apply (this takes 30-45 minutes)...")
            success, output = self._run_command("terraform apply -auto-approve aro.plan", cwd=work_dir, stream_output=True)
            if not success:
                return {"status": "error", "message": f"Terraform apply failed: {output}"}
            
            print("\nDeployment completed!")
            
            # Get outputs
            success, output = self._run_command("terraform output -json", cwd=work_dir)
            if success:
                try:
                    outputs = json.loads(output)
                    console_url = outputs.get("console_url", {}).get("value")
                    api_url = outputs.get("api_url", {}).get("value")
                    
                    cluster_name = params.get("name", "agentic-aro")
                    cmd = f"az aro list-credentials --resource-group {rg_name} --name {cluster_name} --output json"
                    success, creds_output = self._run_command(cmd)
                    
                    if success:
                        credentials = json.loads(creds_output)
                        username = credentials.get("kubeadminUsername")
                        password = credentials.get("kubeadminPassword")
                        
                        return {
                            "status": "success",
                            "message": f"Successfully deployed ARO cluster {cluster_name}!",
                            "console_url": console_url,
                            "api_url": api_url,
                            "resource_group": rg_name,
                            "username": username,
                            "password": password
                        }
                    else:
                        return {
                            "status": "success",
                            "message": f"ARO cluster deployed",
                            "console_url": console_url,
                            "api_url": api_url,
                            "resource_group": rg_name
                        }
                except:
                    return {"status": "success", "message": "ARO cluster deployed"}
            
            return {"status": "success", "message": "ARO cluster deployed successfully!"}
            
        except Exception as e:
            return {"status": "error", "message": f"Deployment failed: {str(e)}"}
            
    def destroy_cluster(self, resource_group):
        print(f"Starting destruction of resource group: {resource_group}")
        
        try:
            cmd = f"az group exists --name {resource_group} --output tsv"
            success, output = self._run_command(cmd)
            
            if not success or output.strip().lower() != 'true':
                return {"status": "error", "message": f"Resource group {resource_group} not found"}
            
            print("Deleting resource group (this may take several minutes)...")
            cmd = f"az group delete --name {resource_group} --yes"
            success, output = self._run_command(cmd)
            
            if success:
                print("Destruction completed!")
                return {"status": "success", "message": f"ARO cluster in {resource_group} destroyed"}
            else:
                return {"status": "error", "message": f"Failed to destroy: {output}"}
        except Exception as e:
            return {"status": "error", "message": f"Error: {str(e)}"}

Once this file is saved, we are clear to proceed with the orchestration steps of the process.

Creating the simulator

Next, you will be creating the orchestrator or the brain of the agent. This simulator is a high-level orchestration layer implementing a state machine for deployment/deletion operations and it handles whether you ask it for either mock or real deployment (and destruction).

Copy the lines below and save it as simulator.py.

import os
import json
import time
import tempfile
from pathlib import Path

class AROAzureOpenAISimulator:
    def __init__(self, debug=False, mock=False, azure_openai_deployment="gpt-4o-mini"):
        self.debug = debug
        self.mock = mock
        self.azure_openai_deployment = azure_openai_deployment
        self.openai_parser = None
        self.setup_status = None
        self.cluster_temp_dirs = {}
    
    def setup(self):
        from setup import setup_environment
        self.setup_status = setup_environment()
        
        from parser import AzureOpenAIParser
        self.openai_parser = AzureOpenAIParser(
            deployment_name=self.azure_openai_deployment,
            debug=self.debug
        )
        
        return self.setup_status
    
    def process_request(self, request):
        if any(word in request.lower() for word in ["delete", "destroy", "remove", "tear down"]):
            return self._handle_deletion(request)
        else:
            return self._handle_deployment(request)
    
    def _handle_deployment(self, request):
        print(f"Parsing request: '{request}'")
        params = self.openai_parser.extract_aro_parameters(request)
        
        if not params:
            return {"status": "error", "message": "Failed to extract parameters"}
        
        print(f"Extracted parameters: {params}")
        return self._deploy_aro_cluster(params)
    
    def _handle_deletion(self, request):
        params = self.openai_parser.extract_deletion_parameters(request)
        
        if not params or not params.get("name"):
            return {"status": "error", "message": "Failed to extract cluster name"}
        
        return self._destroy_aro_cluster(params.get("name"))
    
    def _deploy_aro_cluster(self, params):
        cluster_name = params.get('name', 'agentic-aro')
        
        if self.mock:
            return self._mock_aro_deployment(params)
        
        print("\n" + "="*60)
        print(f"Starting ARO Cluster Deployment: {cluster_name}")
        print("="*60 + "\n")
        
        from deployment import ARODeploymentAgent
        try:
            aro_deployer = ARODeploymentAgent(debug=self.debug)
            result = aro_deployer.deploy_cluster(params)
            
            if result.get("status") == "success" and hasattr(aro_deployer, "work_dir"):
                self.cluster_temp_dirs[cluster_name] = str(aro_deployer.work_dir)
            
            return result
        except Exception as e:
            return {"status": "error", "message": f"Error deploying ARO cluster: {str(e)}"}
    
    def _destroy_aro_cluster(self, cluster_name):
        resource_group = f"{cluster_name}-rg"
        
        if self.mock:
            return self._mock_aro_destroy(cluster_name)
        
        from deployment import ARODeploymentAgent
        try:
            aro_deployer = ARODeploymentAgent(debug=self.debug)
            
            if cluster_name in self.cluster_temp_dirs:
                aro_deployer.work_dir = Path(self.cluster_temp_dirs[cluster_name])
            
            result = aro_deployer.destroy_cluster(resource_group)
            
            if result.get("status") == "success" and cluster_name in self.cluster_temp_dirs:
                del self.cluster_temp_dirs[cluster_name]
            
            return result
        except Exception as e:
            return {"status": "error", "message": f"Error destroying ARO cluster: {str(e)}"}
    
    def _mock_aro_deployment(self, params):
        cluster_name = params.get('name', 'agentic-aro')
        region = params.get('region', 'westus')
        rg_name = f"{cluster_name}-rg"
        
        time.sleep(6)
        
        console_url = f"https://console-openshift-console.apps.{cluster_name}.{region}.aroapp.io"
        api_url = f"https://api.{cluster_name}.{region}.aroapp.io:6443"
        
        return {
            "status": "success",
            "message": f"[MOCK] Successfully deployed ARO cluster {cluster_name}",
            "console_url": console_url,
            "api_url": api_url,
            "resource_group": rg_name,
            "username": "kubeadmin",
            "password": "Mock-Pass-123!"
        }
    
    def _mock_aro_destroy(self, cluster_name):
        resource_group = f"{cluster_name}-rg"
        time.sleep(3)
        
        return {
            "status": "success",
            "message": f"[MOCK] ARO cluster {cluster_name} destroyed"
        }

After saving this file, we can now proceed to integrating all our files and credentials into a notebook.

Previous resource
Setup
Next resource
Notebook integration

This learning path is for operations teams or system administrators

Developers may want to check out Getting started with Red Hat OpenShift AI on developers.redhat.com. 

Get started on developers.redhat.com

Hybrid Cloud Logo LinkedIn YouTube Facebook Twitter

Platforms

Tools

Try, buy, sell

Communicate

About Red Hat

We’re the world’s leading provider of enterprise open source solutions—including Linux, cloud, container, and Kubernetes. We deliver hardened solutions that make it easier for enterprises to work across platforms and environments, from the core datacenter to the network edge.

© 2025 Red Hat