Class: VectorMCP::Transport::SSE
- Inherits:
-
Object
- Object
- VectorMCP::Transport::SSE
- Defined in:
- lib/vector_mcp/transport/sse.rb,
lib/vector_mcp/transport/sse/puma_config.rb,
lib/vector_mcp/transport/sse/stream_manager.rb,
lib/vector_mcp/transport/sse/message_handler.rb,
lib/vector_mcp/transport/sse/client_connection.rb
Overview
Implements the Model Context Protocol transport over HTTP using Server-Sent Events (SSE) for server-to-client messages and HTTP POST for client-to-server messages. This transport uses Puma as the HTTP server with Ruby threading for concurrency.
It provides two main HTTP endpoints:
- SSE Endpoint (
<path_prefix>/sse
): Clients connect here via GET to establish an SSE stream. The server sends an initialevent: endpoint
with a unique URL for the client to POST messages back. Subsequent messages from the server (responses, notifications) are sent asevent: message
. - Message Endpoint (
<path_prefix>/message
): Clients POST JSON-RPC messages here. Thesession_id
(obtained from the SSE endpoint event) must be included as a query parameter. The server responds with a 202 Accepted and then sends the actual JSON-RPC response/error asynchronously over the client's established SSE stream.
Defined Under Namespace
Classes: ClientConnection, MessageHandler, PumaConfig, StreamManager
Instance Attribute Summary collapse
-
#host ⇒ String
readonly
The hostname or IP address the server will bind to.
-
#logger ⇒ Logger
readonly
The logger instance, shared with the server.
-
#path_prefix ⇒ String
readonly
The base URL path for MCP endpoints (e.g., "/mcp").
-
#port ⇒ Integer
readonly
The port number the server will listen on.
-
#server ⇒ VectorMCP::Server
readonly
The server instance this transport is bound to.
Instance Method Summary collapse
-
#broadcast_notification(method, params = nil) ⇒ void
Broadcasts a JSON-RPC notification to all currently connected client sessions.
-
#build_rack_app(session = nil) ⇒ self
Provides compatibility for tests that expect a
build_rack_app
helper. -
#call(env) ⇒ Array(Integer, Hash, Object)
Handles incoming HTTP requests.
-
#initialize(server, options = {}) ⇒ SSE
constructor
Initializes a new SSE transport.
-
#run ⇒ void
Starts the SSE transport, creating a shared session and launching the Puma server.
-
#send_notification(session_id, method, params = nil) ⇒ Boolean
Sends a JSON-RPC notification to a specific client session via its SSE stream.
-
#stop ⇒ Object
Stops the transport and cleans up resources.
Constructor Details
#initialize(server, options = {}) ⇒ SSE
Initializes a new SSE transport.
53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
# File 'lib/vector_mcp/transport/sse.rb', line 53 def initialize(server, = {}) @server = server @logger = server.logger @host = [:host] || "localhost" @port = [:port] || 8000 prefix = [:path_prefix] || "/mcp" @path_prefix = prefix.start_with?("/") ? prefix : "/#{prefix}" @path_prefix = @path_prefix.delete_suffix("/") @sse_path = "#{@path_prefix}/sse" @message_path = "#{@path_prefix}/message" # Thread-safe client storage using concurrent-ruby @clients = Concurrent::Hash.new @session = nil # Global session for this transport instance, initialized in run @puma_server = nil @running = false logger.debug { "SSE Transport initialized with prefix: #{@path_prefix}, SSE path: #{@sse_path}, Message path: #{@message_path}" } end |
Instance Attribute Details
#host ⇒ String (readonly)
The hostname or IP address the server will bind to.
43 44 45 |
# File 'lib/vector_mcp/transport/sse.rb', line 43 def host @host end |
#logger ⇒ Logger (readonly)
The logger instance, shared with the server.
43 44 45 |
# File 'lib/vector_mcp/transport/sse.rb', line 43 def logger @logger end |
#path_prefix ⇒ String (readonly)
The base URL path for MCP endpoints (e.g., "/mcp").
43 44 45 |
# File 'lib/vector_mcp/transport/sse.rb', line 43 def path_prefix @path_prefix end |
#port ⇒ Integer (readonly)
The port number the server will listen on.
43 44 45 |
# File 'lib/vector_mcp/transport/sse.rb', line 43 def port @port end |
#server ⇒ VectorMCP::Server (readonly)
The server instance this transport is bound to.
43 44 45 |
# File 'lib/vector_mcp/transport/sse.rb', line 43 def server @server end |
Instance Method Details
#broadcast_notification(method, params = nil) ⇒ void
This method returns an undefined value.
Broadcasts a JSON-RPC notification to all currently connected client sessions.
130 131 132 133 134 135 136 137 138 |
# File 'lib/vector_mcp/transport/sse.rb', line 130 def broadcast_notification(method, params = nil) logger.debug { "Broadcasting notification '#{method}' to #{@clients.size} client(s)" } = { jsonrpc: "2.0", method: method } [:params] = params if params @clients.each_value do |client_conn| StreamManager.(client_conn, ) end end |
#build_rack_app(session = nil) ⇒ self
Provides compatibility for tests that expect a build_rack_app
helper.
Since the transport itself is a Rack app (defines #call
), it returns self
.
145 146 147 148 |
# File 'lib/vector_mcp/transport/sse.rb', line 145 def build_rack_app(session = nil) @session = session if session self end |
#call(env) ⇒ Array(Integer, Hash, Object)
Handles incoming HTTP requests. This is the entry point for the Rack application. It routes requests to the appropriate handler based on the path.
93 94 95 96 97 98 99 100 101 102 103 104 105 |
# File 'lib/vector_mcp/transport/sse.rb', line 93 def call(env) start_time = Time.now path = env["PATH_INFO"] http_method = env["REQUEST_METHOD"] logger.info "Received #{http_method} request for #{path}" status, headers, body = route_request(path, env) log_response(http_method, path, start_time, status) [status, headers, body] rescue StandardError => e handle_call_error(http_method, path, e) end |
#run ⇒ void
This method returns an undefined value.
Starts the SSE transport, creating a shared session and launching the Puma server. This method will block until the server is stopped (e.g., via SIGINT/SIGTERM).
78 79 80 81 82 83 84 |
# File 'lib/vector_mcp/transport/sse.rb', line 78 def run logger.info("Starting server with Puma SSE transport on #{@host}:#{@port}") create_session start_puma_server rescue StandardError => e handle_fatal_error(e) end |
#send_notification(session_id, method, params = nil) ⇒ Boolean
Sends a JSON-RPC notification to a specific client session via its SSE stream.
115 116 117 118 119 120 121 122 123 |
# File 'lib/vector_mcp/transport/sse.rb', line 115 def send_notification(session_id, method, params = nil) = { jsonrpc: "2.0", method: method } [:params] = params if params client_conn = @clients[session_id] return false unless client_conn StreamManager.(client_conn, ) end |
#stop ⇒ Object
Stops the transport and cleans up resources
151 152 153 154 155 156 |
# File 'lib/vector_mcp/transport/sse.rb', line 151 def stop @running = false cleanup_clients @puma_server&.stop logger.info("SSE transport stopped") end |