Transports

Transports in the Model Context Protocol (MCP) provide the foundation for communication between clients and servers. A transport handles the underlying mechanics of how messages are sent and received.

Message Format

MCP uses JSON-RPC 2.0 as its wire format. The transport layer is responsible for converting MCP protocol messages into JSON-RPC format for transmission and converting received JSON-RPC messages back into MCP protocol messages.

There are three types of JSON-RPC messages used:

Requests

{
  jsonrpc: "2.0",
  id: number | string,
  method: string,
  params?: object
}

Responses

{
  jsonrpc: "2.0",
  id: number | string,
  result?: object,
  error?: {
    code: number,
    message: string,
    data?: unknown
  }
}

Notifications

{
  jsonrpc: "2.0",
  method: string,
  params?: object
}

Built-in Transport Types

MCP includes two standard transport implementations:

Standard Input/Output (stdio)

The stdio transport enables communication through standard input and output streams. This is particularly useful for local integrations and command-line tools.

Use stdio when:

  • Building command-line tools
  • Implementing local integrations
  • Needing simple process communication
  • Working with shell scripts
const transport = new StdioServerTransport();
await server.connect(transport);
```
const transport = new StdioClientTransport({
  command: "./server",
  args: ["--option", "value"]
});
await client.connect(transport);
```
async with stdio_server() as streams:
    await app.run(
        streams[0],
        streams[1],
        app.create_initialization_options()
    )
```
async with stdio_client(params) as streams:
    async with ClientSession(streams[0], streams[1]) as session:
        await session.initialize()
```

Server-Sent Events (SSE)

SSE transport enables server-to-client streaming with HTTP POST requests for client-to-server communication.

Use SSE when:

  • Only server-to-client streaming is needed
  • Working with restricted networks
  • Implementing simple updates
const app = express();

const server = new Server({
  name: "example-server",
  version: "1.0.0"
}, {
  capabilities: {}
});

let transport: SSEServerTransport | null = null;

app.get("/sse", (req, res) => {
  transport = new SSEServerTransport("/messages", res);
  server.connect(transport);
});

app.post("/messages", (req, res) => {
  if (transport) {
    transport.handlePostMessage(req, res);
  }
});

app.listen(3000);
```
const transport = new SSEClientTransport(
  new URL("http://localhost:3000/sse")
);
await client.connect(transport);
```
app = Server("example-server")
sse = SseServerTransport("/messages")

async def handle_sse(scope, receive, send):
    async with sse.connect_sse(scope, receive, send) as streams:
        await app.run(streams[0], streams[1], app.create_initialization_options())

async def handle_messages(scope, receive, send):
    await sse.handle_post_message(scope, receive, send)

starlette_app = Starlette(
    routes=[
        Route("/sse", endpoint=handle_sse),
        Route("/messages", endpoint=handle_messages, methods=["POST"]),
    ]
)
```

Custom Transports

MCP makes it easy to implement custom transports for specific needs. Any transport implementation just needs to conform to the Transport interface:

You can implement custom transports for:

  • Custom network protocols
  • Specialized communication channels
  • Integration with existing systems
  • Performance optimization
  // Send a JSON-RPC message
  send(message: JSONRPCMessage): Promise<void>;

  // Close the connection
  close(): Promise<void>;

  // Callbacks
  onclose?: () => void;
  onerror?: (error: Error) => void;
  onmessage?: (message: JSONRPCMessage) => void;
}
```
    Args:
        read_stream: Stream to read incoming messages from
        write_stream: Stream to write outgoing messages to
    """
    async with anyio.create_task_group() as tg:
        try:
            # Start processing messages
            tg.start_soon(lambda: process_messages(read_stream))

            # Send messages
            async with write_stream:
                yield write_stream

        except Exception as exc:
            # Handle errors
            raise exc
        finally:
            # Clean up
            tg.cancel_scope.cancel()
            await write_stream.aclose()
            await read_stream.aclose()
```

Error Handling

Transport implementations should handle various error scenarios:

  1. Connection errors
  2. Message parsing errors
  3. Protocol errors
  4. Network timeouts
  5. Resource cleanup

Example error handling:

  async send(message: JSONRPCMessage) {
    try {
      // Sending logic
    } catch (error) {
      this.onerror?.(new Error(`Failed to send message: ${error}`));
      throw error;
    }
  }
}
```
        async def message_handler():
            try:
                async with read_stream_writer:
                    # Message handling logic
                    pass
            except Exception as exc:
                logger.error(f"Failed to handle message: {exc}")
                raise exc

        async with anyio.create_task_group() as tg:
            tg.start_soon(message_handler)
            try:
                # Yield streams for communication
                yield read_stream, write_stream
            except Exception as exc:
                logger.error(f"Transport error: {exc}")
                raise exc
            finally:
                tg.cancel_scope.cancel()
                await write_stream.aclose()
                await read_stream.aclose()
    except Exception as exc:
        logger.error(f"Failed to initialize transport: {exc}")
        raise exc
```

Best Practices

When implementing or using MCP transport:

  1. Handle connection lifecycle properly
  2. Implement proper error handling
  3. Clean up resources on connection close
  4. Use appropriate timeouts
  5. Validate messages before sending
  6. Log transport events for debugging
  7. Implement reconnection logic when appropriate
  8. Handle backpressure in message queues
  9. Monitor connection health
  10. Implement proper security measures

Security Considerations

When implementing transport:

Authentication and Authorization

  • Implement proper authentication mechanisms
  • Validate client credentials
  • Use secure token handling
  • Implement authorization checks

Data Security

  • Use TLS for network transport
  • Encrypt sensitive data
  • Validate message integrity
  • Implement message size limits
  • Sanitize input data

Network Security

  • Implement rate limiting
  • Use appropriate timeouts
  • Handle denial of service scenarios
  • Monitor for unusual patterns
  • Implement proper firewall rules

Debugging Transport

Tips for debugging transport issues:

  1. Enable debug logging
  2. Monitor message flow
  3. Check connection states
  4. Validate message formats
  5. Test error scenarios
  6. Use network analysis tools
  7. Implement health checks
  8. Monitor resource usage
  9. Test edge cases
  10. Use proper error tracking