Class: VectorMCP::Transport::SSE::ClientConnection
- Inherits:
-
Object
- Object
- VectorMCP::Transport::SSE::ClientConnection
- Defined in:
- lib/vector_mcp/transport/sse/client_connection.rb
Overview
Manages individual client connection state for SSE transport. Each client connection has a unique session ID, message queue, and streaming thread.
Instance Attribute Summary collapse
-
#logger ⇒ Object
readonly
Returns the value of attribute logger.
-
#message_queue ⇒ Object
readonly
Returns the value of attribute message_queue.
-
#session_id ⇒ Object
readonly
Returns the value of attribute session_id.
-
#stream_io ⇒ Object
Returns the value of attribute stream_io.
-
#stream_thread ⇒ Object
Returns the value of attribute stream_thread.
Instance Method Summary collapse
-
#close ⇒ Object
Closes the client connection and cleans up resources.
-
#closed? ⇒ Boolean
Checks if the connection is closed.
-
#dequeue_message ⇒ Hash?
Dequeues the next message from the client's message queue.
-
#enqueue_message(message) ⇒ Boolean
Enqueues a message to be sent to this client.
-
#initialize(session_id, logger) ⇒ ClientConnection
constructor
Initializes a new client connection.
-
#queue_size ⇒ Integer
Gets the current queue size.
Constructor Details
#initialize(session_id, logger) ⇒ ClientConnection
Initializes a new client connection.
16 17 18 19 20 21 22 23 24 25 26 |
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 16 def initialize(session_id, logger) @session_id = session_id @logger = logger @message_queue = Queue.new @stream_thread = nil @stream_io = nil @closed = false @mutex = Mutex.new logger.debug { "Client connection created: #{session_id}" } end |
Instance Attribute Details
#logger ⇒ Object (readonly)
Returns the value of attribute logger.
9 10 11 |
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 9 def logger @logger end |
#message_queue ⇒ Object (readonly)
Returns the value of attribute message_queue.
9 10 11 |
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 9 def @message_queue end |
#session_id ⇒ Object (readonly)
Returns the value of attribute session_id.
9 10 11 |
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 9 def session_id @session_id end |
#stream_io ⇒ Object
Returns the value of attribute stream_io.
10 11 12 |
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 10 def stream_io @stream_io end |
#stream_thread ⇒ Object
Returns the value of attribute stream_thread.
10 11 12 |
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 10 def stream_thread @stream_thread end |
Instance Method Details
#close ⇒ Object
Closes the client connection and cleans up resources. This method is thread-safe and can be called multiple times.
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 37 def close @mutex.synchronize do return if @closed @closed = true logger.debug { "Closing client connection: #{session_id}" } # Close the message queue to signal streaming thread to stop @message_queue.close if @message_queue.respond_to?(:close) # Close the stream I/O if it exists begin @stream_io&.close rescue StandardError => e logger.warn { "Error closing stream I/O for #{session_id}: #{e.}" } end # Stop the streaming thread if @stream_thread&.alive? @stream_thread.kill @stream_thread.join(1) # Wait up to 1 second for clean shutdown end logger.debug { "Client connection closed: #{session_id}" } end end |
#closed? ⇒ Boolean
Checks if the connection is closed
31 32 33 |
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 31 def closed? @mutex.synchronize { @closed } end |
#dequeue_message ⇒ Hash?
Dequeues the next message from the client's message queue. This method blocks until a message is available or the queue is closed.
89 90 91 92 93 94 95 96 97 98 99 100 |
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 89 def return nil if closed? begin @message_queue.pop rescue ClosedQueueError nil rescue StandardError => e logger.error { "Error dequeuing message for client #{session_id}: #{e.}" } nil end end |
#enqueue_message(message) ⇒ Boolean
Enqueues a message to be sent to this client. This method is thread-safe.
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 |
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 69 def () return false if closed? begin @message_queue.push() logger.debug { "Message enqueued for client #{session_id}: #{.inspect}" } true rescue ClosedQueueError logger.warn { "Attempted to enqueue message to closed queue for client #{session_id}" } false rescue StandardError => e logger.error { "Error enqueuing message for client #{session_id}: #{e.}" } false end end |
#queue_size ⇒ Integer
Gets the current queue size
105 106 107 108 109 |
# File 'lib/vector_mcp/transport/sse/client_connection.rb', line 105 def queue_size @message_queue.size rescue StandardError 0 end |