Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse worker to scheduler WS connection for scheduling #8918

Open
avoloshko opened this issue Oct 30, 2024 · 4 comments
Open

Reuse worker to scheduler WS connection for scheduling #8918

avoloshko opened this issue Oct 30, 2024 · 4 comments

Comments

@avoloshko
Copy link

I learned that right now for every task the scheduler creates a new connection that sometimes may not be feasible when the worker is behind the firewall. Since WS communication between client -> scheduler and worker -> scheduler already works, it can be used for scheduling tasks. Although, I understand that there may be some worker to worker communication, it's important to make the system work even though a worker is not directly accessible from the scheduler.

@fjetter
Copy link
Member

fjetter commented Oct 30, 2024

I learned that right now for every task the scheduler creates a new connection

This isn't true. We're opening a single connection between each worker and the scheduler. This connection is initiated from the worker. This connection is used in bidirectional communication to deal with all administrative traffic (e.g. assign tasks, notify about results). If there is a firewall blocking this, the worker cannot even register and would fail during startup.

If you are running into any issue you have to be more specific. There are a couple of specialized APIs where this is not true but ordinary scheduling will not open any connections to the worker that are initiated by the scheduler.

@avoloshko
Copy link
Author

I learned that right now for every task the scheduler creates a new connection

This isn't true. We're opening a single connection between each worker and the scheduler. This connection is initiated from the worker. This connection is used in bidirectional communication to deal with all administrative traffic (e.g. assign tasks, notify about results). If there is a firewall blocking this, the worker cannot even register and would fail during startup.

If you are running into any issue you have to be more specific. There are a couple of specialized APIs where this is not true but ordinary scheduling will not open any connections to the worker that are initiated by the scheduler.

Yes, indeed. I wasn't precise in describing the actual problem. I see that a worker initiates a ws connection with the scheduler and it is used for administrative purposes. However, as I see there's another connection that is initiated by the scheduler and that is the one that fails. The system starts up without any errors, but I see errors once the first job is scheduled:

Be aware that provided IP addresses are local IP addresses. Scheduler is public while the worker and the client are behind a firewall:

Scheduler started:

2024-10-30 09:08:30,210 - distributed.scheduler - INFO - State start
2024-10-30 09:08:30,211 - distributed.diskutils - DEBUG - Locking '/tmp/dask-scratch-space/scheduler-2dxhm73x.dirlock'...
2024-10-30 09:08:30,213 - distributed.scheduler - INFO - -----------------------------------------------
2024-10-30 09:08:30,213 - distributed.scheduler - DEBUG - Clear task state
2024-10-30 09:08:30,214 - distributed.scheduler - INFO -   Scheduler at:   ws://172.31.29.175:8786
2024-10-30 09:08:30,214 - distributed.scheduler - INFO -   dashboard at:  http://172.31.29.175:8787/status
2024-10-30 09:08:30,214 - distributed.scheduler - INFO - Registering Worker plugin shuffle
2024-10-30 09:08:32,214 - distributed.active_memory_manager - DEBUG - Running policy: ReduceReplicas()
2024-10-30 09:08:32,214 - distributed.active_memory_manager - DEBUG - Active Memory Manager run in 0ms

Worker started

warnings.warn(
2024-10-30 09:08:50,710 - distributed.nanny - INFO -         Start Nanny at: 'ws://172.28.0.12:38255'
2024-10-30 09:08:51,937 - distributed.worker - INFO -       Start worker at:     ws://172.28.0.12:46663
2024-10-30 09:08:51,937 - distributed.worker - INFO -          Listening to:     ws://172.28.0.12:46663
2024-10-30 09:08:51,937 - distributed.worker - INFO -          dashboard at:          172.28.0.12:39265
2024-10-30 09:08:51,937 - distributed.worker - INFO - Waiting to connect to:   ws://16.170.220.238:8786
2024-10-30 09:08:51,937 - distributed.worker - INFO - -------------------------------------------------
2024-10-30 09:08:51,937 - distributed.worker - INFO -               Threads:                          2
2024-10-30 09:08:51,937 - distributed.worker - INFO -                Memory:                  12.67 GiB
2024-10-30 09:08:51,937 - distributed.worker - INFO -       Local Directory: /tmp/dask-scratch-space/worker-5hl_si0p
2024-10-30 09:08:51,938 - distributed.worker - INFO - -------------------------------------------------
2024-10-30 09:08:53,231 - distributed.worker - WARNING - Mismatched versions found

+---------+---------------------------------------------+-----------------+-----------------+
| Package | Worker-5141a793-490c-4246-b8d7-c58c050686cc | Scheduler       | Workers         |
+---------+---------------------------------------------+-----------------+-----------------+
| python  | 3.10.12.final.0                             | 3.10.15.final.0 | 3.10.12.final.0 |
+---------+---------------------------------------------+-----------------+-----------------+
2024-10-30 09:08:53,232 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-10-30 09:08:53,233 - distributed.worker - INFO -         Registered to:   ws://16.170.220.238:8786
2024-10-30 09:08:53,233 - distributed.worker - INFO - -------------------------------------------------
2024-10-30 09:08:53,234 - distributed.core - INFO - Starting established connection to ws://16.170.220.238:8786

Scheduler received a connection from the worker

2024-10-30 09:08:50,649 - distributed.core - DEBUG - Connection from '104.197.153.164:0' to Scheduler
2024-10-30 09:08:50,649 - distributed.core - DEBUG - Message from '104.197.153.164:0': {'op': 'register_nanny', 'address': 'ws://172.28.0.12:38255'}
2024-10-30 09:08:50,649 - distributed.core - DEBUG - Calling into handler add_nanny
2024-10-30 09:08:52,215 - distributed.active_memory_manager - DEBUG - Running policy: ReduceReplicas()
2024-10-30 09:08:52,215 - distributed.active_memory_manager - DEBUG - Active Memory Manager run in 0ms
2024-10-30 09:08:52,408 - distributed.core - DEBUG - Connection from '104.197.153.164:0' to Scheduler
2024-10-30 09:08:52,825 - distributed.core - DEBUG - Message from '104.197.153.164:0': {'op': 'register-worker', 'reply': False, 'address': 'ws://172.28.0.12:46663', 'status': 'init', 'nthreads': 2, 'name': 'ws://172.28.0.12:46663', 'now': 1730279332.185576, 'resources': {}, 'memory_limit': 13609431040, 'local_directory': '/tmp/dask-scratch-space/worker-5hl_si0p', 'services': {'dashboard': 39265}, 'nanny': 'ws://172.28.0.12:38255', 'pid': 61898, 'versions': {'host': {'python': '3.10.12.final.0', 'python-bits': 64, 'OS': 'Linux', 'OS-release': '6.1.85+', 'machine': 'x86_64', 'processor': 'x86_64', 'byteorder': 'little', 'LC_ALL': 'en_US.UTF-8', 'LANG': 'en_US.UTF-8'}, 'packages': {'python': '3.10.12.final.0', 'dask': '2024.10.0', 'distributed': '2024.10.0', 'msgpack': '1.1.0', 'cloudpickle': '3.1.0', 'tornado': '6.4.1', 'toolz': '1.0.0', 'numpy': '1.26.4', 'pandas': '2.2.3', 'lz4': '4.3.3'}}, 'metrics': {'task_counts': {}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {}, 'managed_bytes': 0, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 0, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 0}, 'event_loop_interval': 0.02, 'cpu': 0.0, 'memory': 71446528, 'time': 1730279331.4066799, 'host_net_io': {'read_bps': 0.0, 'write_bps': 0.0}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}, 'num_fds': 17}, 'extra': {}, 'stimulus_id': 'worker-connect-1730279332.6426144', 'server_id': 'Worker-5141a793-490c-4246-b8d7-c58c050686cc'}
2024-10-30 09:08:52,825 - distributed.core - DEBUG - Calling into handler add_worker
2024-10-30 09:08:52,826 - distributed.scheduler - INFO - Register worker <WorkerState 'ws://172.28.0.12:46663', status: init, memory: 0, processing: 0>
2024-10-30 09:08:53,171 - distributed.scheduler - INFO - Starting worker compute stream, ws://172.28.0.12:46663
2024-10-30 09:08:53,171 - distributed.core - INFO - Starting established connection to 104.197.153.164:0
2024-10-30 09:08:53,415 - distributed.scheduler - DEBUG - Worker status init -> running - <WorkerState 'ws://172.28.0.12:46663', status: running, memory: 0, processing: 0>
2024-10-30 09:08:53,469 - distributed.core - DEBUG - Lost connection to '104.197.153.164:0' while sending result for op 'register_nanny': 
2024-10-30 09:08:54,215 - distributed.active_memory_manager - DEBUG - Running policy: ReduceReplicas()
2024-10-30 09:08:54,215 - distributed.active_memory_manager - DEBUG - Ac

Client submits a task and sees the following message in a while

  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))
2024-10-30 05:10:06,345 - distributed.client - WARNING - Couldn't gather 1 keys, rescheduling ('square-af5a1d63b1e77baf3516edcc200f0ac2',)
2024-10-30 05:10:23,835 - distributed.client - WARNING - Couldn't gather 1 keys, rescheduling ('square-af5a1d63b1e77baf3516edcc200f0ac2',)

The scheduler sees the following errors:

2024-10-30 09:10:06,346 - distributed.comm.core - DEBUG - Could not connect to 172.28.0.12:, waiting for 0.0 before retrying
2024-10-30 09:10:06,346 - distributed.scheduler - ERROR - Couldn't gather keys: {'square-af5a1d63b1e77baf3516edcc200f0ac2': 'memory'}
2024-10-30 09:10:06,463 - distributed.core - DEBUG - Message from '104.197.153.164:0': {'op': 'heartbeat_worker', 'address': 'ws://172.28.0.12:46663', 'now': 1730279406.2345676, 'metrics': {'task_counts': {'memory': 1}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.49944448471069336, 'latency': 0.2906045913696289}, 'managed_bytes': 28, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 0, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 0}, 'event_loop_interval': 0.02001708984375, 'cpu': 4.0, 'memory': 158621696, 'time': 1730279405.7359006, 'host_net_io': {'read_bps': 811.0278805131314, 'write_bps': 1774.248647246011}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}, 'num_fds': 20}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True}
2024-10-30 09:10:06,463 - distributed.core - DEBUG - Calling into handler heartbeat_worker
2024-10-30 09:10:06,841 - distributed.core - DEBUG - Message from '99.238.169.96:0': {'op': 'gather', 'keys': ('square-af5a1d63b1e77baf3516edcc200f0ac2',), 'reply': True}
2024-10-30 09:10:06,842 - distributed.core - DEBUG - Calling into handler gather
2024-10-30 09:10:06,842 - distributed.comm.core - DEBUG - Establishing connection to 172.28.0.12:46663
2024-10-30 09:10:06,962 - distributed.core - DEBUG - Message from '104.197.153.164:0': {'op': 'heartbeat_worker', 'address': 'ws://172.28.0.12:46663', 'now': 1730279406.7342885, 'metrics': {'task_counts': {'memory': 1}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.4997103214263916, 'latency': 0.29076552391052246}, 'managed_bytes': 28, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 0, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 0}, 'event_loop_interval': 0.019984498023986816, 'cpu': 6.0, 'memory': 158625792, 'time': 1730279406.2352908, 'host_net_io': {'read_bps': 31270.12509938929, 'write_bps': 32233.299418856906}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}, 'num_fds': 20}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True}
2024-10-30 09:10:06,962 - distributed.core - DEBUG - Calling into handler heartbeat_worker
2024-10-30 09:10:07,463 - distributed.core - DEBUG - Message from '104.197.153.164:0': {'op': 'heartbeat_worker', 'address': 'ws://172.28.0.12:46663', 'now': 1730279407.2351818, 'metrics': {'task_counts': {'memory': 1}, 'bandwidth': {'total': 100000000, 'workers': {}, 'types': {}}, 'digests_total_since_heartbeat': {'tick-duration': 0.5006906986236572, 'latency': 0.2901310920715332}, 'managed_bytes': 28, 'spilled_bytes': {'memory': 0, 'disk': 0}, 'transfer': {'incoming_bytes': 0, 'incoming_count': 0, 'incoming_count_total': 0, 'outgoing_bytes': 0, 'outgoing_count': 0, 'outgoing_count_total': 0}, 'event_loop_interval': 0.019984498023986816, 'cpu': 4.0, 'memory': 158625792, 'time': 1730279406.734969, 'host_net_io': {'read_bps': 27673.825541326492, 'write_bps': 28636.445593783683}, 'host_disk_io': {'read_bps': 0.0, 'write_bps': 0.0}, 'num_fds': 20}, 'executing': {}, 'extensions': {'spans': {}, 'shuffle': {}}, 'reply': True}
2024-10-30 09:10:07,463 - distributed.core - DEBUG - Calling into handler heartbeat_worker
2024-10-30 09:10:07,950 - distributed.core - DEBUG - Message from

So in this particular case, the worker expects scheduler to connect to ws://172.28.0.12:46663, and it will never work if port 46663 remains closed. Do I get it right?

@avoloshko
Copy link
Author

I learned that right now for every task the scheduler creates a new connection

This isn't true. We're opening a single connection between each worker and the scheduler. This connection is initiated from the worker. This connection is used in bidirectional communication to deal with all administrative traffic (e.g. assign tasks, notify about results). If there is a firewall blocking this, the worker cannot even register and would fail during startup.

If you are running into any issue you have to be more specific. There are a couple of specialized APIs where this is not true but ordinary scheduling will not open any connections to the worker that are initiated by the scheduler.

@fjetter I tried to provide as more detailed logs as possible. Long story short, the scheduler tries to connect to 172.28.0.12 which is the private IP of the worker. The worker expects connections on Listening to: ws://172.28.0.12:46663, and that's what I wanted to avoid since that requires a connection from the scheduler to the worker.

Thank you

@jacobtomlinson
Copy link
Member

jacobtomlinson commented Oct 31, 2024

It looks to me like the connection is being lost during worker registration, and then the scheduler is trying to resume the connection and failing. So when you say "the system starts up without any error" I don't think that's the case.

2024-10-30 09:08:53,469 - distributed.core - DEBUG - Lost connection to '104.197.153.164:0' while sending result for op 'register_nanny': 

Dask is designed in a way where the scheduler and workers can all open network connections between each other at any time. If you're running in an environment where a firewall is blocking such a connection then Dask will not behave as expected. Is it feasible to move the scheduler to the same location as your workers?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants