Class: VectorMCP::Transport::SSE
- Inherits:
-
Object
- Object
- VectorMCP::Transport::SSE
- Defined in:
- lib/vector_mcp/transport/sse.rb
Overview
Implements the Model Context Protocol transport over HTTP using Server-Sent Events (SSE)
for server-to-client messages and HTTP POST for client-to-server messages.
This transport uses the async
and falcon
gems for an event-driven, non-blocking I/O model.
It provides two main HTTP endpoints:
- SSE Endpoint (
<path_prefix>/sse
): Clients connect here via GET to establish an SSE stream. The server sends an initialevent: endpoint
with a unique URL for the client to POST messages back. Subsequent messages from the server (responses, notifications) are sent asevent: message
. - Message Endpoint (
<path_prefix>/message
): Clients POST JSON-RPC messages here. Thesession_id
(obtained from the SSE endpoint event) must be included as a query parameter. The server responds with a 202 Accepted and then sends the actual JSON-RPC response/error asynchronously over the client's established SSE stream.
Defined Under Namespace
Classes: ClientConnection
Instance Attribute Summary collapse
-
#host ⇒ String
readonly
The hostname or IP address the server will bind to.
-
#logger ⇒ Logger
readonly
The logger instance, shared with the server.
-
#path_prefix ⇒ String
readonly
The base URL path for MCP endpoints (e.g., "/mcp").
-
#port ⇒ Integer
readonly
The port number the server will listen on.
-
#server ⇒ VectorMCP::Server
readonly
The server instance this transport is bound to.
Instance Method Summary collapse
-
#broadcast_notification(method, params = nil) ⇒ void
Broadcasts a JSON-RPC notification to all currently connected client sessions.
-
#build_rack_app(session = nil) ⇒ self
Provides compatibility for tests that expect a
build_rack_app
helper. -
#call(env) ⇒ Array(Integer, Hash, Object)
Handles incoming HTTP requests.
-
#initialize(server, options = {}) ⇒ SSE
constructor
Initializes a new SSE transport.
-
#run ⇒ void
Starts the SSE transport, creating a shared session and launching the Falcon server.
-
#send_notification(session_id, method, params = nil) ⇒ Boolean
Sends a JSON-RPC notification to a specific client session via its SSE stream.
Constructor Details
#initialize(server, options = {}) ⇒ SSE
Initializes a new SSE transport.
59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
# File 'lib/vector_mcp/transport/sse.rb', line 59 def initialize(server, = {}) @server = server @logger = server.logger @host = [:host] || "localhost" @port = [:port] || 8000 prefix = [:path_prefix] || "/mcp" @path_prefix = prefix.start_with?("/") ? prefix : "/#{prefix}" @path_prefix = @path_prefix.delete_suffix("/") @sse_path = "#{@path_prefix}/sse" @message_path = "#{@path_prefix}/message" @clients = {} # Thread-safe storage: session_id -> ClientConnection @clients_mutex = Mutex.new @session = nil # Global session for this transport instance, initialized in run logger.debug { "SSE Transport initialized with prefix: #{@path_prefix}, SSE path: #{@sse_path}, Message path: #{@message_path}" } end |
Instance Attribute Details
#host ⇒ String (readonly)
The hostname or IP address the server will bind to.
42 43 44 |
# File 'lib/vector_mcp/transport/sse.rb', line 42 def host @host end |
#logger ⇒ Logger (readonly)
The logger instance, shared with the server.
42 43 44 |
# File 'lib/vector_mcp/transport/sse.rb', line 42 def logger @logger end |
#path_prefix ⇒ String (readonly)
The base URL path for MCP endpoints (e.g., "/mcp").
42 43 44 |
# File 'lib/vector_mcp/transport/sse.rb', line 42 def path_prefix @path_prefix end |
#port ⇒ Integer (readonly)
The port number the server will listen on.
42 43 44 |
# File 'lib/vector_mcp/transport/sse.rb', line 42 def port @port end |
#server ⇒ VectorMCP::Server (readonly)
The server instance this transport is bound to.
42 43 44 |
# File 'lib/vector_mcp/transport/sse.rb', line 42 def server @server end |
Instance Method Details
#broadcast_notification(method, params = nil) ⇒ void
This method returns an undefined value.
Broadcasts a JSON-RPC notification to all currently connected client sessions.
130 131 132 133 134 135 136 137 |
# File 'lib/vector_mcp/transport/sse.rb', line 130 def broadcast_notification(method, params = nil) logger.debug { "Broadcasting notification '#{method}' to #{@clients.size} client(s)" } @clients_mutex.synchronize do @clients.each_key do |sid| send_notification(sid, method, params) end end end |
#build_rack_app(session = nil) ⇒ self
Provides compatibility for tests that expect a build_rack_app
helper.
Since the transport itself is a Rack app (defines #call
), it returns self
.
144 145 146 147 |
# File 'lib/vector_mcp/transport/sse.rb', line 144 def build_rack_app(session = nil) @session = session if session # Used by some tests to inject a specific session self end |
#call(env) ⇒ Array(Integer, Hash, Object)
Handles incoming HTTP requests. This is the entry point for the Rack application. It routes requests to the appropriate handler based on the path.
97 98 99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/vector_mcp/transport/sse.rb', line 97 def call(env) start_time = Time.now path, http_method = extract_path_and_method(env) logger.info "Received #{http_method} request for #{path}" status, headers, body = route_request(path, env) log_response(http_method, path, start_time, status) [status, headers, body] rescue StandardError => e # Generic error handling for issues within the call chain itself handle_call_error(http_method, path, e) end |
#run ⇒ void
This method returns an undefined value.
Starts the SSE transport, creating a shared session and launching the Falcon server. This method will block until the server is stopped (e.g., via SIGINT/SIGTERM).
81 82 83 84 85 86 87 |
# File 'lib/vector_mcp/transport/sse.rb', line 81 def run logger.info("Starting server with async SSE transport on #{@host}:#{@port}") create_session start_async_server rescue StandardError => e handle_fatal_error(e) # Logs and exits end |
#send_notification(session_id, method, params = nil) ⇒ Boolean
Sends a JSON-RPC notification to a specific client session via its SSE stream.
119 120 121 122 123 |
# File 'lib/vector_mcp/transport/sse.rb', line 119 def send_notification(session_id, method, params = nil) = { jsonrpc: "2.0", method: method } [:params] = params if params (session_id, ) end |