Class: VectorMCP::Transport::SSE::ClientConnection

Inherits:
Object
  • Object
show all
Defined in:
lib/vector_mcp/transport/sse/client_connection.rb

Overview

Manages individual client connection state for SSE transport. Each client connection has a unique session ID, message queue, and streaming thread.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(session_id, logger) ⇒ ClientConnection

Initializes a new client connection.

Parameters:

  • session_id (String)

    Unique identifier for this client session

  • logger (Logger)

    Logger instance for debugging and error reporting



16
17
18
19
20
21
22
23
24
25
26
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 16

def initialize(session_id, logger)
  @session_id = session_id
  @logger = logger
  @message_queue = Queue.new
  @stream_thread = nil
  @stream_io = nil
  @closed = false
  @mutex = Mutex.new

  logger.debug { "Client connection created: #{session_id}" }
end

Instance Attribute Details

#loggerObject (readonly)

Returns the value of attribute logger.



9
10
11
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 9

def logger
  @logger
end

#message_queueObject (readonly)

Returns the value of attribute message_queue.



9
10
11
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 9

def message_queue
  @message_queue
end

#session_idObject (readonly)

Returns the value of attribute session_id.



9
10
11
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 9

def session_id
  @session_id
end

#stream_ioObject

Returns the value of attribute stream_io.



10
11
12
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 10

def stream_io
  @stream_io
end

#stream_threadObject

Returns the value of attribute stream_thread.



10
11
12
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 10

def stream_thread
  @stream_thread
end

Instance Method Details

#closeObject

Closes the client connection and cleans up resources. This method is thread-safe and can be called multiple times.



37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 37

def close
  @mutex.synchronize do
    return if @closed

    @closed = true
    logger.debug { "Closing client connection: #{session_id}" }

    # Close the message queue to signal streaming thread to stop
    @message_queue.close if @message_queue.respond_to?(:close)

    # Close the stream I/O if it exists
    begin
      @stream_io&.close
    rescue StandardError => e
      logger.warn { "Error closing stream I/O for #{session_id}: #{e.message}" }
    end

    # Stop the streaming thread
    if @stream_thread&.alive?
      @stream_thread.kill
      @stream_thread.join(1) # Wait up to 1 second for clean shutdown
    end

    logger.debug { "Client connection closed: #{session_id}" }
  end
end

#closed?Boolean

Checks if the connection is closed

Returns:

  • (Boolean)

    true if connection is closed



31
32
33
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 31

def closed?
  @mutex.synchronize { @closed }
end

#dequeue_messageHash?

Dequeues the next message from the client's message queue. This method blocks until a message is available or the queue is closed.

Returns:

  • (Hash, nil)

    The next message, or nil if queue is closed



89
90
91
92
93
94
95
96
97
98
99
100
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 89

def dequeue_message
  return nil if closed?

  begin
    @message_queue.pop
  rescue ClosedQueueError
    nil
  rescue StandardError => e
    logger.error { "Error dequeuing message for client #{session_id}: #{e.message}" }
    nil
  end
end

#enqueue_message(message) ⇒ Boolean

Enqueues a message to be sent to this client. This method is thread-safe.

Parameters:

  • message (Hash)

    The JSON-RPC message to send

Returns:

  • (Boolean)

    true if message was enqueued successfully



69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 69

def enqueue_message(message)
  return false if closed?

  begin
    @message_queue.push(message)
    logger.debug { "Message enqueued for client #{session_id}: #{message.inspect}" }
    true
  rescue ClosedQueueError
    logger.warn { "Attempted to enqueue message to closed queue for client #{session_id}" }
    false
  rescue StandardError => e
    logger.error { "Error enqueuing message for client #{session_id}: #{e.message}" }
    false
  end
end

#queue_sizeInteger

Gets the current queue size

Returns:

  • (Integer)

    Number of messages waiting in the queue



105
106
107
108
109
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 105

def queue_size
  @message_queue.size
rescue StandardError
  0
end