diff --git a/sre-agent-xpander.ai/.env.example b/sre-agent-xpander.ai/.env.example new file mode 100644 index 000000000..60ec31b35 --- /dev/null +++ b/sre-agent-xpander.ai/.env.example @@ -0,0 +1 @@ +OPENAI_API_KEY="your-openai-key" \ No newline at end of file diff --git a/sre-agent-xpander.ai/README.md b/sre-agent-xpander.ai/README.md new file mode 100644 index 000000000..ff0d6d4c6 --- /dev/null +++ b/sre-agent-xpander.ai/README.md @@ -0,0 +1,103 @@ +# Junior SRE-Agent + +This project demonstrates how to build a Site Reliability Engineer (SRE) agent using Agno's Agent framework and Xpander's event streaming. It supports two modes: + +* **Local CLI** for ad-hoc queries +* **Event Listener** for streaming chat events via Xpander Chat UI + +We use the following tech stack: + +* Agno's Agent framework +* Xpander for event streaming & agent management +* Kubernetes (kubectl CLI) + +--- + +## Setup and Installation + +Ensure you have Python 3.12 or later installed on your system. + +### Clone the repository + +```bash +git clone +cd sre-agent-xpander.ai +``` + +### Create & activate virtual environment + +```bash +python3.12 -m venv .venv +source .venv/bin/activate # macOS/Linux +.venv\Scripts\activate # Windows +``` + +### Install dependencies + +```bash +pip install -r requirements.txt +``` + +### Configure credentials + +```bash +cp .env.example +cp xpander_config.json.example +``` + +**Configure `.env` file for OpenAI:** +```bash +OPENAI_API_KEY=your_openai_api_key_here +``` + +**Configure `xpander_config.json` for Xpander credentials:** +```json +{ + "agent_id": "your_xpander_agent_id", + "api_key": "your_xpander_api_key", + "org_id": "your_xpander_org_id", + "base_url": "https://agent-controller.xpander.ai" +} + +## Xpander Agent Configuration + +Follow these steps to configure your Xpander agent: + +1. Sign in to the Xpander dashboard at [https://app.xpander.ai](https://app.xpander.ai) +2. Create a new agent (or select an existing one) and note its **Agent ID** and **Organization ID** +3. Go to the **API Keys** section and generate a new API key or use default +4. Copy the key and update your `xpander_config.json` file: + +--- + +## Run the project + +### CLI Mode + +```bash +python sre_agent.py +``` + +Type your queries at the `➜ ` prompt and enter `exit` or `quit` to stop. + +### Event Listener Mode + +```bash +python xpander_handler.py +``` + +Incoming messages will be forwarded to the SREAgent, any detected `kubectl` commands run live, and responses streamed back via SSE. + +--- + +## 📬 Stay Updated with Our Newsletter! + +**Get a FREE Data Science eBook** 📖 with 150+ essential lessons in Data Science when you subscribe to our newsletter! Stay in the loop with the latest tutorials, insights, and exclusive resources. [Subscribe now!](https://join.dailydoseofds.com) + +[![Daily Dose of Data Science Newsletter](https://github.com/patchy631/ai-engineering/blob/main/resources/join_ddods.png)](https://join.dailydoseofds.com) + +--- + +## Contribution + +Contributions are welcome! Please fork the repository, create a feature branch, and submit a Pull Request with your improvements. diff --git a/sre-agent-xpander.ai/requirements.txt b/sre-agent-xpander.ai/requirements.txt new file mode 100644 index 000000000..4c4ef6e69 --- /dev/null +++ b/sre-agent-xpander.ai/requirements.txt @@ -0,0 +1,4 @@ +xpander-utils +agno +openai +python-dotenv diff --git a/sre-agent-xpander.ai/sre_agent.py b/sre-agent-xpander.ai/sre_agent.py new file mode 100644 index 000000000..a66fa1321 --- /dev/null +++ b/sre-agent-xpander.ai/sre_agent.py @@ -0,0 +1,108 @@ +import asyncio +import json +import logging +import re +import subprocess +from pathlib import Path +from typing import Optional, Any + +from agno.agent import Agent +from agno.models.openai import OpenAIChat +from agno.tools.mcp import MultiMCPTools +from agno.tools.thinking import ThinkingTools +from dotenv import load_dotenv +from xpander_utils.sdk.adapters import AgnoAdapter + +# set up logging +t_logger = logging.getLogger(__name__) + +# look for any kubectl command and strip code fences +KUBECTL = re.compile(r"kubectl\s+(.+)", re.IGNORECASE) +FENCE = re.compile(r"```[\s\S]*?```", re.MULTILINE) + +class LocalKubectlTool(MultiMCPTools): + name = "kubectl" + + def __init__(self) -> None: + super().__init__([self.name], env={}) + # capture context once + self.ctx = subprocess.run( + ["kubectl","config","current-context"], + capture_output=True, text=True, check=False + ).stdout.strip() + + def kubectl(self, flags: str) -> str: + # run kubectl with saved context + cmd = ["kubectl"] + (["--context", self.ctx] if self.ctx else []) + flags.split() + p = subprocess.run(cmd, capture_output=True, text=True) + return p.stdout if p.returncode == 0 else p.stderr + +class SREAgent: + def __init__(self, adapter: AgnoAdapter) -> None: + self.adapter = adapter + self.agent: Optional[Agent] = None + self.ktool = LocalKubectlTool() + + async def run( + self, + message: str, + *, + user_id: str, + session_id: str, + cli: bool = False + ) -> Any: + # initialize LLM agent if needed + if not self.agent: + self.agent = self.build_agent() + + # get AI response + resp = await ( + self.agent.aprint_response(message, user_id, session_id) + if cli + else self.agent.arun(message, user_id=user_id, session_id=session_id) + ) + + # remove code fences + clean = FENCE.sub( + lambda m: "\n".join(m.group(0).splitlines()[1:-1]), resp.content + ) + # search anywhere for kubectl + m = KUBECTL.search(clean) + if m: + flags = m.group(1).splitlines()[0].strip() + resp.content = self.ktool.kubectl(flags) + t_logger.info("ran kubectl %s", flags) + return resp + + def build_agent(self) -> Agent: + # set up the Agno agent with kubectl tool + prompt = self.adapter.get_system_prompt() + instr = ([prompt] if isinstance(prompt, str) else list(prompt)) + [ + "When user asks about Kubernetes, reply with a kubectl command.", + "Always run commands to fetch live data." + ] + return Agent( + model=OpenAIChat(id="gpt-4o"), + tools=[ThinkingTools(add_instructions=True), self.ktool], + instructions=instr, + storage=self.adapter.storage, + markdown=True, + add_history_to_messages=True + ) + +async def _cli() -> None: + load_dotenv() + cfg = json.loads(Path("xpander_config.json").read_text()) + backend = await asyncio.to_thread( + AgnoAdapter, + agent_id=cfg["agent_id"], api_key=cfg["api_key"], base_url=cfg.get("base_url") + ) + agent = SREAgent(backend) + while True: + text = input("➜ ").strip() + if text.lower() in {"exit","quit"}: + break + print((await agent.run(text, user_id="cli", session_id="dev", cli=True)).content) + +if __name__ == "__main__": + asyncio.run(_cli()) diff --git a/sre-agent-xpander.ai/xpander_config.json.example b/sre-agent-xpander.ai/xpander_config.json.example new file mode 100644 index 000000000..9bf06dc81 --- /dev/null +++ b/sre-agent-xpander.ai/xpander_config.json.example @@ -0,0 +1,8 @@ +{ + "base_url": "https://agent-controller.xpander.ai", + "org_id": "your-xpander-org-id", + "agent_id": "your-xpander-agent-id", + "api_key": "your-xpander-api-key", + "controller_url": "https://agent-controller.xpander.ai" + } + \ No newline at end of file diff --git a/sre-agent-xpander.ai/xpander_handler.py b/sre-agent-xpander.ai/xpander_handler.py new file mode 100644 index 000000000..32595fd7a --- /dev/null +++ b/sre-agent-xpander.ai/xpander_handler.py @@ -0,0 +1,61 @@ +import asyncio +import json +import logging +from pathlib import Path +from dotenv import load_dotenv + +# Disable UNIX-only signal handlers on Windows +import asyncio as _asyncio +_asyncio.AbstractEventLoop.add_signal_handler = lambda *a, **k: None + +from xpander_utils.events import XpanderEventListener, AgentExecutionResult, AgentExecution +from xpander_utils.sdk.adapters import AgnoAdapter +from sre_agent import SREAgent + +# Init +load_dotenv() +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +cfg = json.loads(Path("xpander_config.json").read_text()) + +# Management-API client +xp_adapter = asyncio.run( + asyncio.to_thread( + AgnoAdapter, + agent_id=cfg["agent_id"], + api_key=cfg["api_key"], + ) +) + +agent = SREAgent(xp_adapter) + +# Execution callback (forward only) +async def handle_execution_request(task: AgentExecution) -> AgentExecutionResult: + try: + # Optional: register task for Xpander metrics + await asyncio.to_thread( + xp_adapter.agent.init_task, + execution=task.model_dump() + ) + + resp = await agent.run( + message=task.input.text, + user_id=task.input.user.id, + session_id=task.memory_thread_id, + cli=False, + ) + return AgentExecutionResult(result=resp.content, is_success=True) + + except Exception as exc: + logger.exception("Error handling execution request") + return AgentExecutionResult(result=str(exc), is_success=False) + +# Start SSE listener +listener = XpanderEventListener( + api_key = cfg["api_key"], + organization_id = cfg["org_id"], + agent_id = cfg["agent_id"], + base_url = cfg["base_url"], +) +listener.register(on_execution_request=handle_execution_request)