Class: VectorMCP::Transport::SSE

Inherits:
Object
  • Object
show all
Defined in:
lib/vector_mcp/transport/sse.rb

Overview

Implements the Model Context Protocol transport over HTTP using Server-Sent Events (SSE) for server-to-client messages and HTTP POST for client-to-server messages. This transport uses the async and falcon gems for an event-driven, non-blocking I/O model.

It provides two main HTTP endpoints:

  1. SSE Endpoint (<path_prefix>/sse): Clients connect here via GET to establish an SSE stream. The server sends an initial event: endpoint with a unique URL for the client to POST messages back. Subsequent messages from the server (responses, notifications) are sent as event: message.
  2. Message Endpoint (<path_prefix>/message): Clients POST JSON-RPC messages here. The session_id (obtained from the SSE endpoint event) must be included as a query parameter. The server responds with a 202 Accepted and then sends the actual JSON-RPC response/error asynchronously over the client's established SSE stream.

Examples:

Basic Usage with a Server

server = VectorMCP::Server.new("my-sse-server")
# ... register tools, resources, prompts ...
transport = VectorMCP::Transport::SSE.new(server, port: 8080)
server.run(transport: transport) # or transport.run if server not managing transport lifecycle

Defined Under Namespace

Classes: ClientConnection

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ SSE

Initializes a new SSE transport.

Parameters:

  • server (VectorMCP::Server)

    The server instance that will handle messages.

  • options (Hash) (defaults to: {})

    Configuration options for the transport.

Options Hash (options):

  • :host (String) — default: "localhost"

    The hostname or IP to bind to.

  • :port (Integer) — default: 8000

    The port to listen on.

  • :path_prefix (String) — default: "/mcp"

    The base path for HTTP endpoints.



59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
# File 'lib/vector_mcp/transport/sse.rb', line 59

def initialize(server, options = {})
  @server = server
  @logger = server.logger
  @host = options[:host] || "localhost"
  @port = options[:port] || 8000
  prefix = options[:path_prefix] || "/mcp"
  @path_prefix = prefix.start_with?("/") ? prefix : "/#{prefix}"
  @path_prefix = @path_prefix.delete_suffix("/")
  @sse_path = "#{@path_prefix}/sse"
  @message_path = "#{@path_prefix}/message"

  @clients = {} # Thread-safe storage: session_id -> ClientConnection
  @clients_mutex = Mutex.new
  @session = nil # Global session for this transport instance, initialized in run
  logger.debug { "SSE Transport initialized with prefix: #{@path_prefix}, SSE path: #{@sse_path}, Message path: #{@message_path}" }
end

Instance Attribute Details

#hostString (readonly)

The hostname or IP address the server will bind to.

Returns:

  • (String)

    the current value of host



42
43
44
# File 'lib/vector_mcp/transport/sse.rb', line 42

def host
  @host
end

#loggerLogger (readonly)

The logger instance, shared with the server.

Returns:

  • (Logger)

    the current value of logger



42
43
44
# File 'lib/vector_mcp/transport/sse.rb', line 42

def logger
  @logger
end

#path_prefixString (readonly)

The base URL path for MCP endpoints (e.g., "/mcp").

Returns:

  • (String)

    the current value of path_prefix



42
43
44
# File 'lib/vector_mcp/transport/sse.rb', line 42

def path_prefix
  @path_prefix
end

#portInteger (readonly)

The port number the server will listen on.

Returns:

  • (Integer)

    the current value of port



42
43
44
# File 'lib/vector_mcp/transport/sse.rb', line 42

def port
  @port
end

#serverVectorMCP::Server (readonly)

The server instance this transport is bound to.

Returns:



42
43
44
# File 'lib/vector_mcp/transport/sse.rb', line 42

def server
  @server
end

Instance Method Details

#broadcast_notification(method, params = nil) ⇒ void

This method returns an undefined value.

Broadcasts a JSON-RPC notification to all currently connected client sessions.

Parameters:

  • method (String)

    The method name of the notification.

  • params (Hash, Array, nil) (defaults to: nil)

    The parameters for the notification (optional).



130
131
132
133
134
135
136
137
# File 'lib/vector_mcp/transport/sse.rb', line 130

def broadcast_notification(method, params = nil)
  logger.debug { "Broadcasting notification '#{method}' to #{@clients.size} client(s)" }
  @clients_mutex.synchronize do
    @clients.each_key do |sid|
      send_notification(sid, method, params)
    end
  end
end

#build_rack_app(session = nil) ⇒ self

Provides compatibility for tests that expect a build_rack_app helper. Since the transport itself is a Rack app (defines #call), it returns self.

Parameters:

  • session (VectorMCP::Session, nil) (defaults to: nil)

    An optional session to persist for testing.

Returns:

  • (self)

    The transport instance itself.



144
145
146
147
# File 'lib/vector_mcp/transport/sse.rb', line 144

def build_rack_app(session = nil)
  @session = session if session # Used by some tests to inject a specific session
  self
end

#call(env) ⇒ Array(Integer, Hash, Object)

Handles incoming HTTP requests. This is the entry point for the Rack application. It routes requests to the appropriate handler based on the path.

Parameters:

  • env (Hash, Async::HTTP::Request)

    The Rack environment hash or an Async HTTP request object.

Returns:

  • (Array(Integer, Hash, Object))

    A standard Rack response triplet: [status, headers, body]. The body is typically an Async::HTTP::Body::Writable for SSE or an Array of strings.



97
98
99
100
101
102
103
104
105
106
107
108
109
# File 'lib/vector_mcp/transport/sse.rb', line 97

def call(env)
  start_time = Time.now
  path, http_method = extract_path_and_method(env)
  logger.info "Received #{http_method} request for #{path}"

  status, headers, body = route_request(path, env)

  log_response(http_method, path, start_time, status)
  [status, headers, body]
rescue StandardError => e
  # Generic error handling for issues within the call chain itself
  handle_call_error(http_method, path, e)
end

#runvoid

This method returns an undefined value.

Starts the SSE transport, creating a shared session and launching the Falcon server. This method will block until the server is stopped (e.g., via SIGINT/SIGTERM).

Raises:

  • (StandardError)

    if there's a fatal error during server startup.



81
82
83
84
85
86
87
# File 'lib/vector_mcp/transport/sse.rb', line 81

def run
  logger.info("Starting server with async SSE transport on #{@host}:#{@port}")
  create_session
  start_async_server
rescue StandardError => e
  handle_fatal_error(e) # Logs and exits
end

#send_notification(session_id, method, params = nil) ⇒ Boolean

Sends a JSON-RPC notification to a specific client session via its SSE stream.

Parameters:

  • session_id (String)

    The ID of the client session to send the notification to.

  • method (String)

    The method name of the notification.

  • params (Hash, Array, nil) (defaults to: nil)

    The parameters for the notification (optional).

Returns:

  • (Boolean)

    True if the message was successfully enqueued, false otherwise (e.g., client not found).



119
120
121
122
123
# File 'lib/vector_mcp/transport/sse.rb', line 119

def send_notification(session_id, method, params = nil)
  message = { jsonrpc: "2.0", method: method }
  message[:params] = params if params
  enqueue_message(session_id, message)
end