Class: VectorMCP::Transport::SSE

Inherits:
Object
  • Object
show all
Defined in:
lib/vector_mcp/transport/sse.rb,
lib/vector_mcp/transport/sse/puma_config.rb,
lib/vector_mcp/transport/sse/stream_manager.rb,
lib/vector_mcp/transport/sse/message_handler.rb,
lib/vector_mcp/transport/sse/client_connection.rb

Overview

Implements the Model Context Protocol transport over HTTP using Server-Sent Events (SSE) for server-to-client messages and HTTP POST for client-to-server messages. This transport uses Puma as the HTTP server with Ruby threading for concurrency.

It provides two main HTTP endpoints:

  1. SSE Endpoint (<path_prefix>/sse): Clients connect here via GET to establish an SSE stream. The server sends an initial event: endpoint with a unique URL for the client to POST messages back. Subsequent messages from the server (responses, notifications) are sent as event: message.
  2. Message Endpoint (<path_prefix>/message): Clients POST JSON-RPC messages here. The session_id (obtained from the SSE endpoint event) must be included as a query parameter. The server responds with a 202 Accepted and then sends the actual JSON-RPC response/error asynchronously over the client's established SSE stream.

Examples:

Basic Usage with a Server

server = VectorMCP::Server.new("my-sse-server")
# ... register tools, resources, prompts ...
transport = VectorMCP::Transport::SSE.new(server, port: 8080)
server.run(transport: transport)

Defined Under Namespace

Classes: ClientConnection, MessageHandler, PumaConfig, StreamManager

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(server, options = {}) ⇒ SSE

Initializes a new SSE transport.

Parameters:

  • server (VectorMCP::Server)

    The server instance that will handle messages.

  • options (Hash) (defaults to: {})

    Configuration options for the transport.

Options Hash (options):

  • :host (String) — default: "localhost"

    The hostname or IP to bind to.

  • :port (Integer) — default: 8000

    The port to listen on.

  • :path_prefix (String) — default: "/mcp"

    The base path for HTTP endpoints.



53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
# File 'lib/vector_mcp/transport/sse.rb', line 53

def initialize(server, options = {})
  @server = server
  @logger = server.logger
  @host = options[:host] || "localhost"
  @port = options[:port] || 8000
  prefix = options[:path_prefix] || "/mcp"
  @path_prefix = prefix.start_with?("/") ? prefix : "/#{prefix}"
  @path_prefix = @path_prefix.delete_suffix("/")
  @sse_path = "#{@path_prefix}/sse"
  @message_path = "#{@path_prefix}/message"

  # Thread-safe client storage using concurrent-ruby
  @clients = Concurrent::Hash.new
  @session = nil # Global session for this transport instance, initialized in run
  @puma_server = nil
  @running = false

  logger.debug { "SSE Transport initialized with prefix: #{@path_prefix}, SSE path: #{@sse_path}, Message path: #{@message_path}" }
end

Instance Attribute Details

#hostString (readonly)

The hostname or IP address the server will bind to.

Returns:

  • (String)

    the current value of host



43
44
45
# File 'lib/vector_mcp/transport/sse.rb', line 43

def host
  @host
end

#loggerLogger (readonly)

The logger instance, shared with the server.

Returns:

  • (Logger)

    the current value of logger



43
44
45
# File 'lib/vector_mcp/transport/sse.rb', line 43

def logger
  @logger
end

#path_prefixString (readonly)

The base URL path for MCP endpoints (e.g., "/mcp").

Returns:

  • (String)

    the current value of path_prefix



43
44
45
# File 'lib/vector_mcp/transport/sse.rb', line 43

def path_prefix
  @path_prefix
end

#portInteger (readonly)

The port number the server will listen on.

Returns:

  • (Integer)

    the current value of port



43
44
45
# File 'lib/vector_mcp/transport/sse.rb', line 43

def port
  @port
end

#serverVectorMCP::Server (readonly)

The server instance this transport is bound to.

Returns:



43
44
45
# File 'lib/vector_mcp/transport/sse.rb', line 43

def server
  @server
end

Instance Method Details

#broadcast_notification(method, params = nil) ⇒ void

This method returns an undefined value.

Broadcasts a JSON-RPC notification to all currently connected client sessions.

Parameters:

  • method (String)

    The method name of the notification.

  • params (Hash, Array, nil) (defaults to: nil)

    The parameters for the notification (optional).



130
131
132
133
134
135
136
137
138
# File 'lib/vector_mcp/transport/sse.rb', line 130

def broadcast_notification(method, params = nil)
  logger.debug { "Broadcasting notification '#{method}' to #{@clients.size} client(s)" }
  message = { jsonrpc: "2.0", method: method }
  message[:params] = params if params

  @clients.each_value do |client_conn|
    StreamManager.enqueue_message(client_conn, message)
  end
end

#build_rack_app(session = nil) ⇒ self

Provides compatibility for tests that expect a build_rack_app helper. Since the transport itself is a Rack app (defines #call), it returns self.

Parameters:

  • session (VectorMCP::Session, nil) (defaults to: nil)

    An optional session to persist for testing.

Returns:

  • (self)

    The transport instance itself.



145
146
147
148
# File 'lib/vector_mcp/transport/sse.rb', line 145

def build_rack_app(session = nil)
  @session = session if session
  self
end

#call(env) ⇒ Array(Integer, Hash, Object)

Handles incoming HTTP requests. This is the entry point for the Rack application. It routes requests to the appropriate handler based on the path.

Parameters:

  • env (Hash)

    The Rack environment hash.

Returns:

  • (Array(Integer, Hash, Object))

    A standard Rack response triplet: [status, headers, body].



93
94
95
96
97
98
99
100
101
102
103
104
105
# File 'lib/vector_mcp/transport/sse.rb', line 93

def call(env)
  start_time = Time.now
  path = env["PATH_INFO"]
  http_method = env["REQUEST_METHOD"]
  logger.info "Received #{http_method} request for #{path}"

  status, headers, body = route_request(path, env)

  log_response(http_method, path, start_time, status)
  [status, headers, body]
rescue StandardError => e
  handle_call_error(http_method, path, e)
end

#runvoid

This method returns an undefined value.

Starts the SSE transport, creating a shared session and launching the Puma server. This method will block until the server is stopped (e.g., via SIGINT/SIGTERM).

Raises:

  • (StandardError)

    if there's a fatal error during server startup.



78
79
80
81
82
83
84
# File 'lib/vector_mcp/transport/sse.rb', line 78

def run
  logger.info("Starting server with Puma SSE transport on #{@host}:#{@port}")
  create_session
  start_puma_server
rescue StandardError => e
  handle_fatal_error(e)
end

#send_notification(session_id, method, params = nil) ⇒ Boolean

Sends a JSON-RPC notification to a specific client session via its SSE stream.

Parameters:

  • session_id (String)

    The ID of the client session to send the notification to.

  • method (String)

    The method name of the notification.

  • params (Hash, Array, nil) (defaults to: nil)

    The parameters for the notification (optional).

Returns:

  • (Boolean)

    True if the message was successfully enqueued, false otherwise (e.g., client not found).



115
116
117
118
119
120
121
122
123
# File 'lib/vector_mcp/transport/sse.rb', line 115

def send_notification(session_id, method, params = nil)
  message = { jsonrpc: "2.0", method: method }
  message[:params] = params if params

  client_conn = @clients[session_id]
  return false unless client_conn

  StreamManager.enqueue_message(client_conn, message)
end

#stopObject

Stops the transport and cleans up resources



151
152
153
154
155
156
# File 'lib/vector_mcp/transport/sse.rb', line 151

def stop
  @running = false
  cleanup_clients
  @puma_server&.stop
  logger.info("SSE transport stopped")
end