Class: VectorMCP::Transport::SSE::StreamManager

Inherits:
Object
  • Object
show all
Defined in:
lib/vector_mcp/transport/sse/stream_manager.rb

Overview

Manages Server-Sent Events streaming for client connections. Handles creation of streaming responses and message broadcasting.

Class Method Summary collapse

Class Method Details

.create_sse_stream(client_conn, endpoint_url, logger) ⇒ Enumerator

Creates an SSE streaming response body for a client connection.

Parameters:

  • client_conn (ClientConnection)

    The client connection to stream to

  • endpoint_url (String)

    The URL for the client to POST messages to

  • logger (Logger)

    Logger instance for debugging

Returns:

  • (Enumerator)

    Rack-compatible streaming response body



16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# File 'lib/vector_mcp/transport/sse/stream_manager.rb', line 16

def create_sse_stream(client_conn, endpoint_url, logger)
  Enumerator.new do |yielder|
    # Send initial endpoint event
    yielder << format_sse_event("endpoint", endpoint_url)
    logger.debug { "Sent endpoint event to client #{client_conn.session_id}: #{endpoint_url}" }

    # Start streaming thread for this client
    client_conn.stream_thread = Thread.new do
      stream_messages_to_client(client_conn, yielder, logger)
    end

    # Keep the connection alive by yielding from the streaming thread
    client_conn.stream_thread.join
  rescue StandardError => e
    logger.error { "Error in SSE stream for client #{client_conn.session_id}: #{e.message}\n#{e.backtrace.join("\n")}" }
  ensure
    logger.debug { "SSE stream ended for client #{client_conn.session_id}" }
    client_conn.close
  end
end

.enqueue_message(client_conn, message) ⇒ Boolean

Enqueues a message to a specific client connection.

Parameters:

  • client_conn (ClientConnection)

    The target client connection

  • message (Hash)

    The JSON-RPC message to send

Returns:

  • (Boolean)

    true if message was enqueued successfully



42
43
44
45
46
# File 'lib/vector_mcp/transport/sse/stream_manager.rb', line 42

def enqueue_message(client_conn, message)
  return false unless client_conn && !client_conn.closed?

  client_conn.enqueue_message(message)
end