Skip to content

Commit

Permalink
Improve ws client - 1 (#10)
Browse files Browse the repository at this point in the history
* update

* update

* update
  • Loading branch information
wuminzhe authored Jul 30, 2024
1 parent 82e1d8a commit 7d0e299
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 106 deletions.
10 changes: 8 additions & 2 deletions examples/http_client_2.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@
ScaleRb.logger.level = Logger::DEBUG

client = ScaleRb::HttpClient.new('https://polkadot-rpc.dwellir.com')
block_number = 21585684
block_number = 21711742
block_hash = client.chain_getBlockHash(block_number)
storage = client.get_storage(block_hash, 'System', 'Events')
metadata = client.get_metadata(block_hash)

storage_query = ScaleRb::WsClient::StorageQuery.new(
pallet_name: 'System',
storage_name: 'Events',
)
storage = client.get_storage(block_hash, storage_query, metadata)
puts "block #{block_number}(#{block_hash}) has #{storage.length} events"
6 changes: 3 additions & 3 deletions examples/ws_client_1.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
require 'scale_rb'

# ScaleRb.logger.level = Logger::DEBUG
ScaleRb.logger.level = Logger::DEBUG

ScaleRb::WsClient.start('wss://polkadot-rpc.dwellir.com') do |client|
block_hash = client.chain_getBlockHash(21585684)
runtime_version = client.state_getRuntimeVersion(block_hash)
puts runtime_version['specName']
puts runtime_version['specVersion']
puts runtime_version[:specName]
puts runtime_version[:specVersion]
end
11 changes: 8 additions & 3 deletions examples/ws_client_2.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
require 'scale_rb'

ScaleRb.logger.level = Logger::DEBUG

# You can have multiple subscriptions at the same time
ScaleRb::WsClient.start('wss://polkadot-rpc.dwellir.com') do |client|
client.chain_subscribeNewHead do |head|
puts "Received new head at height: #{head['number'].to_i(16)}"
puts "Received new head at height: #{head[:number].to_i(16)}"
end

client.state_subscribeStorage do |storage|
block_hash = storage[:block]
changes = storage[:changes]
puts "Received #{changes.size} storage changes at block: #{block_hash}"
end
end
5 changes: 3 additions & 2 deletions examples/ws_client_3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@

# ScaleRb.logger.level = Logger::DEBUG

# Unsubscribe after receiving 5 new heads
ScaleRb::WsClient.start('wss://polkadot-rpc.dwellir.com') do |client|
count = 0

subscription_id = client.chain_subscribeNewHead do |head|
count = count + 1

if count < 5
block_number = head['number'].to_i(16)
block_number = head[:number].to_i(16)
block_hash = client.chain_getBlockHash(block_number)
puts "Received new head at height: #{block_number}, block hash: #{block_hash}"
else
unsub_result = client.chain_unsubscribeNewHead(subscription_id)
puts "Unsubscribed from new heads: #{unsub_result}"
puts "Unsubscribe result: #{unsub_result}"
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/client/client_ext.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_storage(block_hash, storage_query, metadata = nil)
def query_storage_at(block_hash, storage_keys, type_id, default, registry)
result = state_queryStorageAt(storage_keys, block_hash)
result.map do |item|
item['changes'].map do |change|
item[:changes].map do |change|
storage_key = change[0]
data = change[1] || default
storage = data.nil? ? nil : PortableCodec.decode(type_id, data._to_bytes, registry)[0]
Expand Down
11 changes: 6 additions & 5 deletions lib/client/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def initialize(url)
raise 'url format is not correct' unless url.match?(url_regex)

@uri = URI.parse(url)
@supported_methods = request('rpc_methods', [])['methods']
@supported_methods = request('rpc_methods', [])[:methods]
end

def request(method, params = [])
Expand All @@ -37,11 +37,12 @@ def request(method, params = [])
response = http.request(request)
raise response unless response.is_a?(Net::HTTPOK)

body = JSON.parse(response.body)
# parse response, make key symbol
body = JSON.parse(response.body, symbolize_names: true)
ScaleRb.logger.debug "Response: #{body}"
raise body['error'] if body['error']
raise body[:error] if body[:error]

body['result']
body[:result]
end

def respond_to_missing?(*_args)
Expand All @@ -54,4 +55,4 @@ def method_missing(method, *args)
request(method.to_s, args)
end
end
end
end
131 changes: 41 additions & 90 deletions lib/client/ws_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,50 +9,34 @@
module ScaleRb
class WsClient
def self.start(url)
Async do |task|

Sync do |task|
endpoint = Async::HTTP::Endpoint.parse(url, alpn_protocols: Async::HTTP::Protocol::HTTP11.names)
client = WsClient.new

task.async do
Async::WebSocket::Client.connect(endpoint) do |connection|
Async do
while request = client.next_request
ScaleRb.logger.debug "Sending request: #{request.to_json}"
connection.write(request.to_json)
end
end

# inside main task
Async::WebSocket::Client.connect(endpoint) do |connection|
client = WsClient.new(connection)

recv_task = task.async do
while message = connection.read
data = JSON.parse(message)
data = message.parse
ScaleRb.logger.debug "Received message: #{data}"

Async do
task.async do
client.handle_response(data)
rescue => e
ScaleRb.logger.error "#{e.class}: #{e.message}"
ScaleRb.logger.error e.backtrace.join("\n")
task.stop
end
end
rescue => e
ScaleRb.logger.error "#{e.class}: #{e.message}"
ScaleRb.logger.error e.backtrace.join("\n")
ensure
task.stop
end
end

task.async do
client.supported_methods = client.rpc_methods()['methods']
client.supported_methods = client.rpc_methods()[:methods]
yield client
rescue => e
ScaleRb.logger.error "#{e.class}: #{e.message}"
ScaleRb.logger.error e.backtrace.join("\n")
task.stop

recv_task.wait
ensure
recv_task&.stop
end
end
end
end # Sync

end # start
end
end

Expand All @@ -61,8 +45,8 @@ class WsClient
include ClientExt
attr_accessor :supported_methods

def initialize
@queue = Async::Queue.new
def initialize(connection)
@connection = connection
@response_handler = ResponseHandler.new
@subscription_handler = SubscriptionHandler.new
@request_id = 1
Expand All @@ -87,7 +71,7 @@ def method_missing(method, *args)
raise "A subscribe method needs a block" unless block_given?

subscribe(method, args) do |notification|
yield notification['params']['result']
yield notification[:params][:result]
end
else
request(method, args)
Expand All @@ -111,14 +95,10 @@ def unsubscribe(method, subscription_id)
end
end

def next_request
@queue.dequeue
end

def handle_response(response)
if response.key?('id')
if response.key?(:id)
@response_handler.handle(response)
elsif response.key?('method')
elsif response.key?(:method)
@subscription_handler.handle(response)
else
puts "Received an unknown message: #{response}"
Expand All @@ -131,53 +111,34 @@ def request(method, params = [])
response_future = Async::Notification.new

@response_handler.register(@request_id, proc { |response|
# this is running in the main task
response_future.signal(response['result'])
response_future.signal(response[:result])
})

request = JsonRpcRequest.new(@request_id, method, params)
@queue.enqueue(request)
request = { jsonrpc: '2.0', id: @request_id, method: method, params: params }
ScaleRb.logger.debug "Sending request: #{request}"
@connection.write(request.to_json)

@request_id += 1

response_future.wait
end
end

class JsonRpcRequest
attr_reader :id, :method, :params

def initialize(id, method, params = {})
@id = id
@method = method
@params = params
end

def to_json(*_args)
{ jsonrpc: '2.0', id: @id, method: @method, params: @params }.to_json
end

# def to_s
# to_json
# end
end

class ResponseHandler
def initialize
@handlers = {}
@callbacks = {}
end

# handler: a proc with response data as param
def register(id, handler)
@handlers[id] = handler
# callback: a proc with response data as param
def register(id, callback)
@callbacks[id] = callback
end

def handle(response)
id = response['id']
if @handlers.key?(id)
handler = @handlers[id]
handler.call(response)
@handlers.delete(id)
id = response[:id]
if @callbacks.key?(id)
callback = @callbacks[id]
callback.call(response)
@callbacks.delete(id)
else
ScaleRb.logger.debug "Received a message with unknown id: #{response}"
end
Expand All @@ -186,33 +147,23 @@ def handle(response)

class SubscriptionHandler
def initialize
@subscriptions = {}
@callbacks = {}
end

def subscribe(subscription_id, handler)
@subscriptions[subscription_id] = handler
def subscribe(subscription_id, callback)
@callbacks[subscription_id] = callback
end

def unsubscribe(subscription_id)
@subscriptions.delete(subscription_id)
@callbacks.delete(subscription_id)
end

def handle(notification)
subscription_id = notification.dig('params', 'subscription')
subscription_id = notification.dig(:params, :subscription)
return if subscription_id.nil?

if @subscriptions.key?(subscription_id)
@subscriptions[subscription_id].call(notification)
else
# the subscription_id may be not registered.
# in client.subscribe function,
# ...
# subscription_id = request(method, params)
# @subscription_handler.subscribe(subscription_id, block)
# ...
# the request(method, params) may be slow, so the subscription_id may be not registered when the first notification comes.
sleep 0.01
handle(notification)
if @callbacks.key?(subscription_id)
@callbacks[subscription_id].call(notification)
end
end
end
Expand Down

0 comments on commit 7d0e299

Please sign in to comment.