Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Design feedback #7

Open
ioquatix opened this issue Jul 15, 2024 · 11 comments
Open

Design feedback #7

ioquatix opened this issue Jul 15, 2024 · 11 comments

Comments

@ioquatix
Copy link

As you requested, I'll give my general feedback.

Prefer Sync for top level entry points

Async do |task|

    def self.start(url)
      Async do |task|

I recommend using Sync here.

Sync{...} is semantically equivalent to Async{...}.wait. It propagates exceptions but still starts the event loop. The benefit of this is that you probably want this method to be synchronous i.e. if a user calls it, you prefer it "blocks" until completion rather than returns an Async::Task. If there is any error, it will propagate up like a normal exception. Users can still write Async{WsClient.start ...} if they desire to create a task and there is no overhead in that case as Sync becomes a no-op.

Prefer to use Message#parse

data = JSON.parse(message)

            while message = connection.read
              data = JSON.parse(message)

You can write message.parse here if you prefer.

Consider using Console for logging

ScaleRb.logger.debug "Received message: #{data}"

I recommend using Console.info and so on for logging. This ties into the rest of Async and generally provides a good experience for users adopting your program within a wider eco-system of asynchronous components.

You don't need to execute this in a child task

task.async do
client.supported_methods = client.request('rpc_methods')['methods']
yield client

As this is a "tail call" of your async logic, you don't need to put it in a child task.

Alternatively, you might like to put your message loop at the top level and put your user code in a child task.

Error handling

I think you error handling is okay, but could probably be simpler. It's nice if you can get tasks to propagate their error handling (consider waiting on them).

Perhaps consider using Async::Variable

response_future = Async::Notification.new

Notifications are edge triggered not level triggered. This might cause odd behaviour if you aren't waiting on it at the time you signal it. Consider using Async::Variable if you are "waiting for a result".

It's okay to call connection.write(request.to_json) directly from the point where you make the request, assuming that you are not trying to handle reconnects etc. You are using a request queue, and you might find the code is simpler if you just directly write the request to the web-socket and read the response.

This is my initial feedback.

@wuminzhe
Copy link
Owner

@ioquatix thanks for your great feedback. I'm a bit busy these days. I will study and improve my wsclient in a few days. thanks again.

@wuminzhe
Copy link
Owner

Prefer Sync for top level entry points
If there is any error, it will propagate up like a normal exception.

Sync do |task|

I tried this example. But puts e.message did not run.

begin
  ScaleRb::WsClient.start('wss://polkadot-rpc.dwellir.com') do |client|
    raise 'hello'
  end
rescue => e
  puts e.message
end

Can you explain more? @ioquatix

@ioquatix
Copy link
Author

It means the exception is not being raised within the sync block. Do you get any log about "task maybe failed with unhandled exception"

@wuminzhe
Copy link
Owner

It means the exception is not being raised within the sync block. Do you get any log about "task maybe failed with unhandled exception"

yes, I got this message:

1.14s: Async::Task
      | Task may have ended with unhandled exception.
      |   RuntimeError: hello

@ioquatix
Copy link
Author

It means you are not waiting on that task from anywhere so when the exception occurs it's not being handled.

@wuminzhe
Copy link
Owner

@ioquatix What's error propagating rule of task and subtask?

@ioquatix
Copy link
Author

ioquatix commented Jul 30, 2024

Async does not force you to use structured concurrency, but it tries to make it easy (fall into a pit of success).

In general, think about it like this: for every task you create, it is the owner of any children tasks it creates. IF you stop a parent task, it should stop all children tasks, and that should make sense, e.g. a parent task might represent an async HTTP server, and each task within that might represent one request being processed.

Error handling should propagate up, like exceptions, but of course it's more tricky when you have separate tasks. In Async, tasks behave like promises, in other words, they have a result, and you can wait on them. Ruby has an exception model that propagates errors up. For a simple linear program, this propagates up the the entry point of the program. For threads, the thread itself will exit, and for Async tasks, the task itself will finish in the error state.

This is documented here: https://socketry.github.io/async/guides/asynchronous-tasks/index#task-lifecycle. You can see the "unhandled StandardError derived exception causes a task to enter the failed state. When a task enters the failed state, waiting on it will propagate the error. A parent task should usually be waiting on the child task, and that task should then propagate the error to the parent. By this logic, unhandled exceptions propagate upwards through your concurrent tasks.

However, if your error is somewhat expected - e.g. if an individual HTTP request fails, it shouldn't fail the entire server - then you should handle it within that context, e.g.

Async do |task|
  while request = next_request
    task.async do
      write_response(handle(request))
    rescue => error
      write_response(error_response(error))
    end
  end
end

In the sub-request, the error is explicitly handled and isn't propagated up.

You would need to decide what kind of error handling makes the most sense for your program, but in a fan-out scenario, usually each child task should know how to handle the errors. If not, you can adopt this model:

Async do |task|
  barrier = Async::Barrier.new
  
  while request = next_request
    barrier.async do |task|
      write_response(handle(request))
    end
  end

  barrier.wait # will raise an error if the child task raises an error.
ensure
  barrier.stop # ensure all tasks are stopped.
end

You can read more about it here: https://socketry.github.io/async/guides/best-practices/index.html#use-a-barrier-to-wait-for-all-tasks-to-complete.

Let me know if that doesn't clear things up.

@wuminzhe
Copy link
Owner

wuminzhe commented Aug 2, 2024

When a task enters the failed state, waiting on it will propagate the error.

If there are 2 or more subtasks, how to propagate errors upward from any one of them to their parent task?

Is there any way to behave like js's await Promise.all(..)? Any error raised by a promise will propagate upwards.

@ioquatix

@ioquatix
Copy link
Author

ioquatix commented Aug 2, 2024

Yes, use the barrier.

@wuminzhe
Copy link
Owner

wuminzhe commented Aug 3, 2024

Yes, use the barrier.

barrier seems to behave differently from Promise.all

(async () => {
  
  try {
    await Promise.all([
      new Promise((resolve, reject) => {
        setTimeout(() => {
          reject(new Error('First task failed'));
        }, 5000);
      }),
      new Promise((resolve, reject) => {
        reject(new Error('Second task failed'));
      })
    ])
  } catch (e) {
    console.log(e.message); // Second task failed
  }
  
})()
require 'async'
require 'async/barrier'

begin
  Sync do |_task|
    barrier = Async::Barrier.new
  
    barrier.async do |_task|
      sleep 5
      raise 'First task failed'
    end
  
    barrier.async do |_task|
      raise 'Second task failed'
    end
  
    barrier.wait
  ensure
    barrier.stop
  end
rescue => e
  puts e.message # I expect "Second task failed", but I get "First task failed"
end

@ioquatix
Copy link
Author

ioquatix commented Aug 3, 2024

Ah yes, the barrier is ordered, while Promise.all may be first come first served. It's because it's much easier to implement ordered wait. I agree it might not be the most desirable behaviour. Let me consider whether we can improve it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants