Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 80 additions & 1 deletion src/strands/tools/mcp/mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from mcp.types import GetPromptResult, ListPromptsResult
from mcp.types import ImageContent as MCPImageContent
from mcp.types import TextContent as MCPTextContent
from mcp.types import EmbeddedResource as MCPEmbeddedResource

from ...types import PaginatedList
from ...types.exceptions import MCPClientInitializationError
Expand Down Expand Up @@ -428,7 +429,7 @@ def _background_task(self) -> None:

def _map_mcp_content_to_tool_result_content(
self,
content: MCPTextContent | MCPImageContent | Any,
content: MCPTextContent | MCPImageContent | MCPEmbeddedResource | Any,
) -> Union[ToolResultContent, None]:
"""Maps MCP content types to tool result content types.

Expand All @@ -452,6 +453,84 @@ def _map_mcp_content_to_tool_result_content(
"source": {"bytes": base64.b64decode(content.data)},
}
}
elif isinstance(content, MCPEmbeddedResource):
self._log_debug_with_thread("mapping MCP embedded resource content")
resource = getattr(content, "resource", None)
if resource is None:
self._log_debug_with_thread("embedded resource has no 'resource' field - dropping")
return None

text_val = getattr(resource, "text", None)
if text_val:
return {"text": text_val}

blob_val = getattr(resource, "blob", None)
mime_type = getattr(resource, "mimeType", None)

if blob_val is not None:
# blob is a base64 string in current mcp schema
raw_bytes: Optional[bytes]
try:
if isinstance(blob_val, (bytes, bytearray)):
raw_bytes = bytes(blob_val)
elif isinstance(blob_val, str):
raw_bytes = base64.b64decode(blob_val)
else:
raw_bytes = None
except Exception:
raw_bytes = None

if raw_bytes is None:
self._log_debug_with_thread("embedded resource blob could not be decoded - dropping")
return None

def _is_textual(mt: Optional[str]) -> bool:
if not mt:
return False
if mt.startswith("text/"):
return True
textual = (
"application/json",
"application/xml",
"application/javascript",
"application/x-yaml",
"application/yaml",
"application/xhtml+xml",
)
if mt in textual or mt.endswith("+json") or mt.endswith("+xml"):
return True
return False

if _is_textual(mime_type):
try:
return {"text": raw_bytes.decode("utf-8", errors="replace")}
except Exception:
pass

if mime_type in MIME_TO_FORMAT:
return {
"image": {
"format": MIME_TO_FORMAT[mime_type],
"source": {"bytes": raw_bytes},
}
}

self._log_debug_with_thread("embedded resource blob with non-textual/unknown mimeType - dropping")
return None

# Handle URI-only resources
uri = getattr(resource, "uri", None)
if uri:
return {
"json": {
"uri": uri,
"mime_type": mime_type
}
}

# Make sure we return in all paths
self._log_debug_with_thread("embedded resource had no usable text/blob/uri; dropping")
return None
else:
self._log_debug_with_thread("unhandled content type: %s - dropping content", content.__class__.__name__)
return None
Expand Down
128 changes: 128 additions & 0 deletions tests/strands/tools/mcp/test_mcp_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
import base64
from unittest.mock import AsyncMock, MagicMock, patch

import pytest
Expand Down Expand Up @@ -541,3 +542,130 @@ def slow_transport():
assert client._background_thread_session is None
assert client._background_thread_event_loop is None
assert not client._init_future.done() # New future created


def test_call_tool_sync_embedded_nested_text(mock_transport, mock_session):
"""EmbeddedResource.resource (uri + text) should map to plain text content."""
er = {
"type": "resource", # required literal
"resource": {
"uri": "mcp://resource/embedded-text-1",
"text": "inner text",
"mimeType": "text/plain",
},
}
mock_session.call_tool.return_value = MCPCallToolResult(isError=False, content=[er])

from strands.tools.mcp import MCPClient
with MCPClient(mock_transport["transport_callable"]) as client:
result = client.call_tool_sync(tool_use_id="er-text", name="get_file_contents", arguments={})

mock_session.call_tool.assert_called_once_with("get_file_contents", {}, None)
assert result["status"] == "success"
assert len(result["content"]) == 1
assert result["content"][0]["text"] == "inner text"


def test_call_tool_sync_embedded_nested_base64_textual_mime(mock_transport, mock_session):
"""EmbeddedResource.resource (uri + blob with textual MIME) should decode to text."""
import base64
from mcp.types import CallToolResult as MCPCallToolResult
payload = base64.b64encode(b'{"k":"v"}').decode()

er = {
"type": "resource",
"resource": {
"uri": "mcp://resource/embedded-blob-1",
# NOTE: blob is a STRING, mimeType is sibling
"blob": payload,
"mimeType": "application/json",
},
}
mock_session.call_tool.return_value = MCPCallToolResult(isError=False, content=[er])

from strands.tools.mcp import MCPClient
with MCPClient(mock_transport["transport_callable"]) as client:
result = client.call_tool_sync(tool_use_id="er-blob", name="get_file_contents", arguments={})

mock_session.call_tool.assert_called_once_with("get_file_contents", {}, None)
assert result["status"] == "success"
assert len(result["content"]) == 1
assert result["content"][0]["text"] == '{"k":"v"}'


def test_call_tool_sync_embedded_image_blob(mock_transport, mock_session):
"""EmbeddedResource.resource (blob with image MIME) should map to image content."""
import base64
# Create a simple 1x1 PNG image
png_data = base64.b64decode("iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg==")
payload = base64.b64encode(png_data).decode()

er = {
"type": "resource",
"resource": {
"uri": "mcp://resource/embedded-image",
"blob": payload,
"mimeType": "image/png",
},
}
mock_session.call_tool.return_value = MCPCallToolResult(isError=False, content=[er])

from strands.tools.mcp import MCPClient
with MCPClient(mock_transport["transport_callable"]) as client:
result = client.call_tool_sync(tool_use_id="er-image", name="get_file_contents", arguments={})

mock_session.call_tool.assert_called_once_with("get_file_contents", {}, None)
assert result["status"] == "success"
assert len(result["content"]) == 1
assert "image" in result["content"][0]
assert result["content"][0]["image"]["format"] == "png"
assert "bytes" in result["content"][0]["image"]["source"]


def test_call_tool_sync_embedded_non_textual_blob_dropped(mock_transport, mock_session):
"""EmbeddedResource.resource (blob with non-textual/unknown MIME) should be dropped."""
payload = base64.b64encode(b'\x00\x01\x02\x03').decode()

er = {
"type": "resource",
"resource": {
"uri": "mcp://resource/embedded-binary",
"blob": payload,
"mimeType": "application/octet-stream",
},
}
mock_session.call_tool.return_value = MCPCallToolResult(isError=False, content=[er])

from strands.tools.mcp import MCPClient
with MCPClient(mock_transport["transport_callable"]) as client:
result = client.call_tool_sync(tool_use_id="er-binary", name="get_file_contents", arguments={})

mock_session.call_tool.assert_called_once_with("get_file_contents", {}, None)
assert result["status"] == "success"
assert len(result["content"]) == 0 # Content should be dropped


def test_call_tool_sync_embedded_multiple_textual_mimes(mock_transport, mock_session):
"""EmbeddedResource with different textual MIME types should decode to text."""
import base64

# Test YAML content
yaml_content = base64.b64encode(b'key: value\nlist:\n - item1\n - item2').decode()
er = {
"type": "resource",
"resource": {
"uri": "mcp://resource/embedded-yaml",
"blob": yaml_content,
"mimeType": "application/yaml",
},
}
mock_session.call_tool.return_value = MCPCallToolResult(isError=False, content=[er])

from strands.tools.mcp import MCPClient
with MCPClient(mock_transport["transport_callable"]) as client:
result = client.call_tool_sync(tool_use_id="er-yaml", name="get_file_contents", arguments={})

mock_session.call_tool.assert_called_once_with("get_file_contents", {}, None)
assert result["status"] == "success"
assert len(result["content"]) == 1
assert "key: value" in result["content"][0]["text"]
123 changes: 123 additions & 0 deletions tests_integ/mcp/test_mcp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,129 @@ def transport_callback() -> MCPTransport:
assert "Hello, Charlie!" in prompt_text


def test_mcp_client_embedded_resources():
"""Test that MCP client properly handles EmbeddedResource content types."""
embedded_resource_mcp_client = MCPClient(
lambda: stdio_client(StdioServerParameters(command="python", args=["tests_integ/embedded_resource_server.py"]))
)

with embedded_resource_mcp_client:
# Test text embedded resource
text_result = embedded_resource_mcp_client.call_tool_sync(
tool_use_id="test-embedded-text",
name="get_text_file_content",
arguments={},
)
assert text_result["status"] == "success"
assert len(text_result["content"]) == 1
assert text_result["content"][0]["text"] == "Hello, this is embedded text content!"

# Test JSON embedded resource (blob with textual MIME type)
json_result = embedded_resource_mcp_client.call_tool_sync(
tool_use_id="test-embedded-json",
name="get_json_file_content",
arguments={},
)
assert json_result["status"] == "success"
assert len(json_result["content"]) == 1
json_content = json_result["content"][0]["text"]
assert "Hello from embedded JSON!" in json_content
assert "test" in json_content

# Test YAML embedded resource (blob with textual MIME type)
yaml_result = embedded_resource_mcp_client.call_tool_sync(
tool_use_id="test-embedded-yaml",
name="get_yaml_file_content",
arguments={},
)
assert yaml_result["status"] == "success"
assert len(yaml_result["content"]) == 1
yaml_content = yaml_result["content"][0]["text"]
assert "Hello from embedded YAML!" in yaml_content
assert "item1" in yaml_content

# Test image embedded resource
image_result = embedded_resource_mcp_client.call_tool_sync(
tool_use_id="test-embedded-image",
name="get_image_content",
arguments={},
)
assert image_result["status"] == "success"
assert len(image_result["content"]) == 1
assert "image" in image_result["content"][0]
assert image_result["content"][0]["image"]["format"] == "png"
assert "bytes" in image_result["content"][0]["image"]["source"]

# Test binary embedded resource (should be dropped due to non-textual MIME type)
binary_result = embedded_resource_mcp_client.call_tool_sync(
tool_use_id="test-embedded-binary",
name="get_binary_content",
arguments={},
)
assert binary_result["status"] == "success"
# Binary content should be dropped, so no content returned
assert len(binary_result["content"]) == 0


@pytest.mark.asyncio
async def test_mcp_client_embedded_resources_async():
"""Test that async MCP client properly handles EmbeddedResource content types."""
embedded_resource_mcp_client = MCPClient(
lambda: stdio_client(StdioServerParameters(command="python", args=["tests_integ/embedded_resource_server.py"]))
)

with embedded_resource_mcp_client:
# Test text embedded resource async
text_result = await embedded_resource_mcp_client.call_tool_async(
tool_use_id="test-embedded-text-async",
name="get_text_file_content",
arguments={},
)
assert text_result["status"] == "success"
assert len(text_result["content"]) == 1
assert text_result["content"][0]["text"] == "Hello, this is embedded text content!"

# Test JSON embedded resource async
json_result = await embedded_resource_mcp_client.call_tool_async(
tool_use_id="test-embedded-json-async",
name="get_json_file_content",
arguments={},
)
assert json_result["status"] == "success"
assert len(json_result["content"]) == 1
json_content = json_result["content"][0]["text"]
assert "Hello from embedded JSON!" in json_content


def test_mcp_client_embedded_resources_with_agent():
"""Test that embedded resources work correctly when used with Agent."""
embedded_resource_mcp_client = MCPClient(
lambda: stdio_client(StdioServerParameters(command="python", args=["tests_integ/embedded_resource_server.py"]))
)

with embedded_resource_mcp_client:
tools = embedded_resource_mcp_client.list_tools_sync()
agent = Agent(tools=tools)

# Test that agent can successfully use tools that return embedded resources
result = agent("Get the text file content and tell me what it says")

# Check that the agent successfully processed the embedded resource
assert result.message is not None
response_text = " ".join([
block["text"]
for block in result.message["content"]
if "text" in block
]).lower()

# The agent should have received and processed the embedded text content
assert any([
"hello" in response_text,
"embedded text content" in response_text,
"text content" in response_text
])


def _messages_to_content_blocks(messages: List[Message]) -> List[ToolUse]:
return [block["toolUse"] for message in messages for block in message["content"] if "toolUse" in block]

Expand Down
Loading