diff --git a/examples/http_client_2.rb b/examples/http_client_2.rb index 0b17d0f..fc19a5d 100644 --- a/examples/http_client_2.rb +++ b/examples/http_client_2.rb @@ -3,7 +3,13 @@ ScaleRb.logger.level = Logger::DEBUG client = ScaleRb::HttpClient.new('https://polkadot-rpc.dwellir.com') -block_number = 21585684 +block_number = 21711742 block_hash = client.chain_getBlockHash(block_number) -storage = client.get_storage(block_hash, 'System', 'Events') +metadata = client.get_metadata(block_hash) + +storage_query = ScaleRb::WsClient::StorageQuery.new( + pallet_name: 'System', + storage_name: 'Events', +) +storage = client.get_storage(block_hash, storage_query, metadata) puts "block #{block_number}(#{block_hash}) has #{storage.length} events" diff --git a/examples/ws_client_1.rb b/examples/ws_client_1.rb index 57a7e69..ed6d37c 100644 --- a/examples/ws_client_1.rb +++ b/examples/ws_client_1.rb @@ -1,10 +1,10 @@ require 'scale_rb' -# ScaleRb.logger.level = Logger::DEBUG +ScaleRb.logger.level = Logger::DEBUG ScaleRb::WsClient.start('wss://polkadot-rpc.dwellir.com') do |client| block_hash = client.chain_getBlockHash(21585684) runtime_version = client.state_getRuntimeVersion(block_hash) - puts runtime_version['specName'] - puts runtime_version['specVersion'] + puts runtime_version[:specName] + puts runtime_version[:specVersion] end diff --git a/examples/ws_client_2.rb b/examples/ws_client_2.rb index 212a5bc..82e7d39 100644 --- a/examples/ws_client_2.rb +++ b/examples/ws_client_2.rb @@ -1,9 +1,14 @@ require 'scale_rb' -ScaleRb.logger.level = Logger::DEBUG - +# You can have multiple subscriptions at the same time ScaleRb::WsClient.start('wss://polkadot-rpc.dwellir.com') do |client| client.chain_subscribeNewHead do |head| - puts "Received new head at height: #{head['number'].to_i(16)}" + puts "Received new head at height: #{head[:number].to_i(16)}" + end + + client.state_subscribeStorage do |storage| + block_hash = storage[:block] + changes = storage[:changes] + puts "Received #{changes.size} storage changes at block: #{block_hash}" end end diff --git a/examples/ws_client_3.rb b/examples/ws_client_3.rb index bf2e678..006d48a 100644 --- a/examples/ws_client_3.rb +++ b/examples/ws_client_3.rb @@ -2,6 +2,7 @@ # ScaleRb.logger.level = Logger::DEBUG +# Unsubscribe after receiving 5 new heads ScaleRb::WsClient.start('wss://polkadot-rpc.dwellir.com') do |client| count = 0 @@ -9,12 +10,12 @@ count = count + 1 if count < 5 - block_number = head['number'].to_i(16) + block_number = head[:number].to_i(16) block_hash = client.chain_getBlockHash(block_number) puts "Received new head at height: #{block_number}, block hash: #{block_hash}" else unsub_result = client.chain_unsubscribeNewHead(subscription_id) - puts "Unsubscribed from new heads: #{unsub_result}" + puts "Unsubscribe result: #{unsub_result}" end end diff --git a/lib/client/client_ext.rb b/lib/client/client_ext.rb index 021fcf1..70dcad9 100644 --- a/lib/client/client_ext.rb +++ b/lib/client/client_ext.rb @@ -42,7 +42,7 @@ def get_storage(block_hash, storage_query, metadata = nil) def query_storage_at(block_hash, storage_keys, type_id, default, registry) result = state_queryStorageAt(storage_keys, block_hash) result.map do |item| - item['changes'].map do |change| + item[:changes].map do |change| storage_key = change[0] data = change[1] || default storage = data.nil? ? nil : PortableCodec.decode(type_id, data._to_bytes, registry)[0] diff --git a/lib/client/http_client.rb b/lib/client/http_client.rb index 5b4d644..6c111fd 100644 --- a/lib/client/http_client.rb +++ b/lib/client/http_client.rb @@ -17,7 +17,7 @@ def initialize(url) raise 'url format is not correct' unless url.match?(url_regex) @uri = URI.parse(url) - @supported_methods = request('rpc_methods', [])['methods'] + @supported_methods = request('rpc_methods', [])[:methods] end def request(method, params = []) @@ -37,11 +37,12 @@ def request(method, params = []) response = http.request(request) raise response unless response.is_a?(Net::HTTPOK) - body = JSON.parse(response.body) + # parse response, make key symbol + body = JSON.parse(response.body, symbolize_names: true) ScaleRb.logger.debug "Response: #{body}" - raise body['error'] if body['error'] + raise body[:error] if body[:error] - body['result'] + body[:result] end def respond_to_missing?(*_args) @@ -54,4 +55,4 @@ def method_missing(method, *args) request(method.to_s, args) end end -end \ No newline at end of file +end diff --git a/lib/client/ws_client.rb b/lib/client/ws_client.rb index 2f8138f..6ce5dd0 100644 --- a/lib/client/ws_client.rb +++ b/lib/client/ws_client.rb @@ -9,50 +9,34 @@ module ScaleRb class WsClient def self.start(url) - Async do |task| + + Sync do |task| endpoint = Async::HTTP::Endpoint.parse(url, alpn_protocols: Async::HTTP::Protocol::HTTP11.names) - client = WsClient.new - - task.async do - Async::WebSocket::Client.connect(endpoint) do |connection| - Async do - while request = client.next_request - ScaleRb.logger.debug "Sending request: #{request.to_json}" - connection.write(request.to_json) - end - end - # inside main task + Async::WebSocket::Client.connect(endpoint) do |connection| + client = WsClient.new(connection) + + recv_task = task.async do while message = connection.read - data = JSON.parse(message) + data = message.parse ScaleRb.logger.debug "Received message: #{data}" - Async do + task.async do client.handle_response(data) - rescue => e - ScaleRb.logger.error "#{e.class}: #{e.message}" - ScaleRb.logger.error e.backtrace.join("\n") - task.stop end end - rescue => e - ScaleRb.logger.error "#{e.class}: #{e.message}" - ScaleRb.logger.error e.backtrace.join("\n") - ensure - task.stop end - end - task.async do - client.supported_methods = client.rpc_methods()['methods'] + client.supported_methods = client.rpc_methods()[:methods] yield client - rescue => e - ScaleRb.logger.error "#{e.class}: #{e.message}" - ScaleRb.logger.error e.backtrace.join("\n") - task.stop + + recv_task.wait + ensure + recv_task&.stop end - end - end + end # Sync + + end # start end end @@ -61,8 +45,8 @@ class WsClient include ClientExt attr_accessor :supported_methods - def initialize - @queue = Async::Queue.new + def initialize(connection) + @connection = connection @response_handler = ResponseHandler.new @subscription_handler = SubscriptionHandler.new @request_id = 1 @@ -87,7 +71,7 @@ def method_missing(method, *args) raise "A subscribe method needs a block" unless block_given? subscribe(method, args) do |notification| - yield notification['params']['result'] + yield notification[:params][:result] end else request(method, args) @@ -111,14 +95,10 @@ def unsubscribe(method, subscription_id) end end - def next_request - @queue.dequeue - end - def handle_response(response) - if response.key?('id') + if response.key?(:id) @response_handler.handle(response) - elsif response.key?('method') + elsif response.key?(:method) @subscription_handler.handle(response) else puts "Received an unknown message: #{response}" @@ -131,53 +111,34 @@ def request(method, params = []) response_future = Async::Notification.new @response_handler.register(@request_id, proc { |response| - # this is running in the main task - response_future.signal(response['result']) + response_future.signal(response[:result]) }) - request = JsonRpcRequest.new(@request_id, method, params) - @queue.enqueue(request) + request = { jsonrpc: '2.0', id: @request_id, method: method, params: params } + ScaleRb.logger.debug "Sending request: #{request}" + @connection.write(request.to_json) @request_id += 1 - response_future.wait end end - class JsonRpcRequest - attr_reader :id, :method, :params - - def initialize(id, method, params = {}) - @id = id - @method = method - @params = params - end - - def to_json(*_args) - { jsonrpc: '2.0', id: @id, method: @method, params: @params }.to_json - end - - # def to_s - # to_json - # end - end - class ResponseHandler def initialize - @handlers = {} + @callbacks = {} end - # handler: a proc with response data as param - def register(id, handler) - @handlers[id] = handler + # callback: a proc with response data as param + def register(id, callback) + @callbacks[id] = callback end def handle(response) - id = response['id'] - if @handlers.key?(id) - handler = @handlers[id] - handler.call(response) - @handlers.delete(id) + id = response[:id] + if @callbacks.key?(id) + callback = @callbacks[id] + callback.call(response) + @callbacks.delete(id) else ScaleRb.logger.debug "Received a message with unknown id: #{response}" end @@ -186,33 +147,23 @@ def handle(response) class SubscriptionHandler def initialize - @subscriptions = {} + @callbacks = {} end - def subscribe(subscription_id, handler) - @subscriptions[subscription_id] = handler + def subscribe(subscription_id, callback) + @callbacks[subscription_id] = callback end def unsubscribe(subscription_id) - @subscriptions.delete(subscription_id) + @callbacks.delete(subscription_id) end def handle(notification) - subscription_id = notification.dig('params', 'subscription') + subscription_id = notification.dig(:params, :subscription) return if subscription_id.nil? - if @subscriptions.key?(subscription_id) - @subscriptions[subscription_id].call(notification) - else - # the subscription_id may be not registered. - # in client.subscribe function, - # ... - # subscription_id = request(method, params) - # @subscription_handler.subscribe(subscription_id, block) - # ... - # the request(method, params) may be slow, so the subscription_id may be not registered when the first notification comes. - sleep 0.01 - handle(notification) + if @callbacks.key?(subscription_id) + @callbacks[subscription_id].call(notification) end end end