Creating deployment agents for agentic AI using Terraform
Now, you will create the Azure Red Hat® OpenShift® deployment agent which essentially is the wrapper that dynamically generates Terraform configurations for Azure Red Hat OpenShift deployment. It also manages the complete lifecycle including state management, the Azure authentication, and resource provisioning. In addition to this work, we’ll tackle the orchestration layer of the agent.
What will you learn?
- How to create deployment agents for Azure Red Hat OpenShift
- How to create a simulator for the agent
What do you need before starting?
- Azure Red Hat OpenShift cluster
- Azure OpenAI model access
- Setup file created in Azure
- OpenAI parser created
Creating the deployment agent
To begin with the deployment agent side of things, you should copy the lines below and save it as deployment.py.
import os
import tempfile
import json
import subprocess
from pathlib import Path
class ARODeploymentAgent:
def __init__(self, debug=False):
self.debug = debug
self.work_dir = Path(tempfile.mkdtemp())
required_env_vars = ["AZURE_SUBSCRIPTION_ID", "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CLIENT_SECRET"]
missing_vars = [var for var in required_env_vars if not os.getenv(var)]
if missing_vars:
raise ValueError(f"Missing required environment variables: {', '.join(missing_vars)}")
self._login_to_azure()
self._create_terraform_files()
def _run_command(self, command, cwd=None, stream_output=False):
try:
if stream_output:
process = subprocess.Popen(
command,
shell=True,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
cwd=cwd or self.work_dir,
env=os.environ.copy(),
bufsize=1
)
output = ""
for line in iter(process.stdout.readline, ''):
if line:
print(line.rstrip())
output += line
process.wait()
return process.returncode == 0, output
else:
result = subprocess.run(
command,
shell=True,
text=True,
capture_output=True,
cwd=cwd or self.work_dir,
env=os.environ.copy()
)
return result.returncode == 0, result.stdout if result.returncode == 0 else result.stderr
except Exception as e:
return False, str(e)
def _login_to_azure(self):
cmd = f"az login --service-principal --username \"{os.getenv('AZURE_CLIENT_ID')}\" --password \"{os.getenv('AZURE_CLIENT_SECRET')}\" --tenant \"{os.getenv('AZURE_TENANT_ID')}\""
self._run_command(cmd)
def _create_terraform_files(self):
variables_tf = '''variable "cluster_name" {
type = string
description = "ARO cluster name"
}
variable "location" {
type = string
description = "Azure region"
}
variable "aro_version" {
type = string
description = "ARO version"
}
variable "worker_node_count" {
type = number
default = 3
description = "Number of worker nodes"
}
variable "worker_vm_size" {
type = string
default = "Standard_D4s_v3"
description = "VM size for worker nodes"
}
variable "subscription_id" {
type = string
description = "Azure Subscription ID"
}
variable "api_server_profile" {
type = string
default = "Public"
description = "API Server visibility (Public or Private)"
}
variable "ingress_profile" {
type = string
default = "Public"
description = "Ingress visibility (Public or Private)"
}'''
main_tf = f'''locals {{
resource_group_name = "${{var.cluster_name}}-rg"
}}
resource "azurerm_resource_group" "aro" {{
name = local.resource_group_name
location = var.location
}}
resource "azurerm_virtual_network" "aro" {{
name = "${{var.cluster_name}}-vnet"
location = azurerm_resource_group.aro.location
resource_group_name = azurerm_resource_group.aro.name
address_space = ["10.0.0.0/16"]
}}
resource "azurerm_subnet" "master_subnet" {{
name = "master-subnet"
resource_group_name = azurerm_resource_group.aro.name
virtual_network_name = azurerm_virtual_network.aro.name
address_prefixes = ["10.0.0.0/23"]
service_endpoints = ["Microsoft.ContainerRegistry"]
private_endpoint_network_policies_enabled = false
private_link_service_network_policies_enabled = false
}}
resource "azurerm_subnet" "worker_subnet" {{
name = "worker-subnet"
resource_group_name = azurerm_resource_group.aro.name
virtual_network_name = azurerm_virtual_network.aro.name
address_prefixes = ["10.0.2.0/23"]
service_endpoints = ["Microsoft.ContainerRegistry"]
private_endpoint_network_policies_enabled = false
private_link_service_network_policies_enabled = false
}}
resource "azurerm_redhat_openshift_cluster" "aro" {{
name = var.cluster_name
location = azurerm_resource_group.aro.location
resource_group_name = azurerm_resource_group.aro.name
cluster_profile {{
domain = var.cluster_name
version = var.aro_version
managed_resource_group_name = "${{azurerm_resource_group.aro.name}}-managed"
}}
network_profile {{
pod_cidr = "10.128.0.0/14"
service_cidr = "172.30.0.0/16"
}}
main_profile {{
vm_size = "Standard_D8s_v3"
subnet_id = azurerm_subnet.master_subnet.id
}}
worker_profile {{
subnet_id = azurerm_subnet.worker_subnet.id
disk_size_gb = 128
node_count = var.worker_node_count
vm_size = var.worker_vm_size
}}
service_principal {{
client_id = "{os.getenv("AZURE_CLIENT_ID")}"
client_secret = "{os.getenv("AZURE_CLIENT_SECRET")}"
}}
api_server_profile {{
visibility = var.api_server_profile
}}
ingress_profile {{
visibility = var.ingress_profile
}}
}}
output "console_url" {{
value = azurerm_redhat_openshift_cluster.aro.console_url
}}
output "api_url" {{
value = azurerm_redhat_openshift_cluster.aro.api_server_profile[0].url
}}'''
provider_tf = f'''terraform {{
required_providers {{
azurerm = {{
source = "hashicorp/azurerm"
version = "~>3.0"
}}
}}
}}
provider "azurerm" {{
features {{}}
subscription_id = var.subscription_id
tenant_id = "{os.getenv("AZURE_TENANT_ID")}"
client_id = "{os.getenv("AZURE_CLIENT_ID")}"
client_secret = "{os.getenv("AZURE_CLIENT_SECRET")}"
}}'''
(self.work_dir / "variables.tf").write_text(variables_tf)
(self.work_dir / "main.tf").write_text(main_tf)
(self.work_dir / "provider.tf").write_text(provider_tf)
def create_terraform_vars(self, params):
cluster_name = params.get("name", "agentic-aro")
region = params.get("region", "westus")
worker_vm_size = params.get("worker_vm_size", "Standard_D4s_v3")
worker_node_count = params.get("worker_node_count", 3)
aro_version = params.get("version") or self.get_latest_aro_version(region)
is_private = params.get("is_private", False)
tfvars_content = f'''subscription_id = "{os.getenv('AZURE_SUBSCRIPTION_ID')}"
cluster_name = "{cluster_name}"
location = "{region}"
aro_version = "{aro_version}"
worker_node_count = {worker_node_count}
worker_vm_size = "{worker_vm_size}"
api_server_profile = "{'Private' if is_private else 'Public'}"
ingress_profile = "{'Private' if is_private else 'Public'}"'''
(self.work_dir / "terraform.tfvars").write_text(tfvars_content)
return str(self.work_dir)
def get_latest_aro_version(self, region="westus"):
cmd = f"az aro get-versions -l {region} --output tsv"
success, output = self._run_command(cmd)
if success and output.strip():
versions = [v.strip() for v in output.strip().split('\n') if v.strip()]
if versions:
stable_versions = [v for v in versions if not any(x in v for x in ['-tech-preview', '-dev', 'nightly'])]
if stable_versions:
stable_versions.sort(reverse=True)
return stable_versions[0]
return "4.16.30"
def deploy_cluster(self, params):
print(f"Processing deployment request for cluster: {params.get('name', 'agentic-aro')}")
rg_name = f"{params.get('name', 'agentic-aro')}-rg"
cmd = f"az group exists --name {rg_name} --output tsv"
success, output = self._run_command(cmd)
if success and output.strip().lower() == 'true':
return {"status": "error", "message": f"Resource group {rg_name} already exists"}
try:
work_dir = self.create_terraform_vars(params)
print("Running terraform init...")
success, output = self._run_command("terraform init", cwd=work_dir, stream_output=True)
if not success:
return {"status": "error", "message": f"Terraform init failed: {output}"}
print("\nRunning terraform plan...")
success, output = self._run_command("terraform plan -out=aro.plan", cwd=work_dir, stream_output=True)
if not success:
return {"status": "error", "message": f"Terraform plan failed: {output}"}
print("\nRunning terraform apply (this takes 30-45 minutes)...")
success, output = self._run_command("terraform apply -auto-approve aro.plan", cwd=work_dir, stream_output=True)
if not success:
return {"status": "error", "message": f"Terraform apply failed: {output}"}
print("\nDeployment completed!")
# Get outputs
success, output = self._run_command("terraform output -json", cwd=work_dir)
if success:
try:
outputs = json.loads(output)
console_url = outputs.get("console_url", {}).get("value")
api_url = outputs.get("api_url", {}).get("value")
cluster_name = params.get("name", "agentic-aro")
cmd = f"az aro list-credentials --resource-group {rg_name} --name {cluster_name} --output json"
success, creds_output = self._run_command(cmd)
if success:
credentials = json.loads(creds_output)
username = credentials.get("kubeadminUsername")
password = credentials.get("kubeadminPassword")
return {
"status": "success",
"message": f"Successfully deployed ARO cluster {cluster_name}!",
"console_url": console_url,
"api_url": api_url,
"resource_group": rg_name,
"username": username,
"password": password
}
else:
return {
"status": "success",
"message": f"ARO cluster deployed",
"console_url": console_url,
"api_url": api_url,
"resource_group": rg_name
}
except:
return {"status": "success", "message": "ARO cluster deployed"}
return {"status": "success", "message": "ARO cluster deployed successfully!"}
except Exception as e:
return {"status": "error", "message": f"Deployment failed: {str(e)}"}
def destroy_cluster(self, resource_group):
print(f"Starting destruction of resource group: {resource_group}")
try:
cmd = f"az group exists --name {resource_group} --output tsv"
success, output = self._run_command(cmd)
if not success or output.strip().lower() != 'true':
return {"status": "error", "message": f"Resource group {resource_group} not found"}
print("Deleting resource group (this may take several minutes)...")
cmd = f"az group delete --name {resource_group} --yes"
success, output = self._run_command(cmd)
if success:
print("Destruction completed!")
return {"status": "success", "message": f"ARO cluster in {resource_group} destroyed"}
else:
return {"status": "error", "message": f"Failed to destroy: {output}"}
except Exception as e:
return {"status": "error", "message": f"Error: {str(e)}"}
Once this file is saved, we are clear to proceed with the orchestration steps of the process.
Creating the simulator
Next, you will be creating the orchestrator or the brain of the agent. This simulator is a high-level orchestration layer implementing a state machine for deployment/deletion operations and it handles whether you ask it for either mock or real deployment (and destruction).
Copy the lines below and save it as simulator.py.
import os
import json
import time
import tempfile
from pathlib import Path
class AROAzureOpenAISimulator:
def __init__(self, debug=False, mock=False, azure_openai_deployment="gpt-4o-mini"):
self.debug = debug
self.mock = mock
self.azure_openai_deployment = azure_openai_deployment
self.openai_parser = None
self.setup_status = None
self.cluster_temp_dirs = {}
def setup(self):
from setup import setup_environment
self.setup_status = setup_environment()
from parser import AzureOpenAIParser
self.openai_parser = AzureOpenAIParser(
deployment_name=self.azure_openai_deployment,
debug=self.debug
)
return self.setup_status
def process_request(self, request):
if any(word in request.lower() for word in ["delete", "destroy", "remove", "tear down"]):
return self._handle_deletion(request)
else:
return self._handle_deployment(request)
def _handle_deployment(self, request):
print(f"Parsing request: '{request}'")
params = self.openai_parser.extract_aro_parameters(request)
if not params:
return {"status": "error", "message": "Failed to extract parameters"}
print(f"Extracted parameters: {params}")
return self._deploy_aro_cluster(params)
def _handle_deletion(self, request):
params = self.openai_parser.extract_deletion_parameters(request)
if not params or not params.get("name"):
return {"status": "error", "message": "Failed to extract cluster name"}
return self._destroy_aro_cluster(params.get("name"))
def _deploy_aro_cluster(self, params):
cluster_name = params.get('name', 'agentic-aro')
if self.mock:
return self._mock_aro_deployment(params)
print("\n" + "="*60)
print(f"Starting ARO Cluster Deployment: {cluster_name}")
print("="*60 + "\n")
from deployment import ARODeploymentAgent
try:
aro_deployer = ARODeploymentAgent(debug=self.debug)
result = aro_deployer.deploy_cluster(params)
if result.get("status") == "success" and hasattr(aro_deployer, "work_dir"):
self.cluster_temp_dirs[cluster_name] = str(aro_deployer.work_dir)
return result
except Exception as e:
return {"status": "error", "message": f"Error deploying ARO cluster: {str(e)}"}
def _destroy_aro_cluster(self, cluster_name):
resource_group = f"{cluster_name}-rg"
if self.mock:
return self._mock_aro_destroy(cluster_name)
from deployment import ARODeploymentAgent
try:
aro_deployer = ARODeploymentAgent(debug=self.debug)
if cluster_name in self.cluster_temp_dirs:
aro_deployer.work_dir = Path(self.cluster_temp_dirs[cluster_name])
result = aro_deployer.destroy_cluster(resource_group)
if result.get("status") == "success" and cluster_name in self.cluster_temp_dirs:
del self.cluster_temp_dirs[cluster_name]
return result
except Exception as e:
return {"status": "error", "message": f"Error destroying ARO cluster: {str(e)}"}
def _mock_aro_deployment(self, params):
cluster_name = params.get('name', 'agentic-aro')
region = params.get('region', 'westus')
rg_name = f"{cluster_name}-rg"
time.sleep(6)
console_url = f"https://console-openshift-console.apps.{cluster_name}.{region}.aroapp.io"
api_url = f"https://api.{cluster_name}.{region}.aroapp.io:6443"
return {
"status": "success",
"message": f"[MOCK] Successfully deployed ARO cluster {cluster_name}",
"console_url": console_url,
"api_url": api_url,
"resource_group": rg_name,
"username": "kubeadmin",
"password": "Mock-Pass-123!"
}
def _mock_aro_destroy(self, cluster_name):
resource_group = f"{cluster_name}-rg"
time.sleep(3)
return {
"status": "success",
"message": f"[MOCK] ARO cluster {cluster_name} destroyed"
}
After saving this file, we can now proceed to integrating all our files and credentials into a notebook.