diff --git a/.github/workflows/erlang.yml b/.github/workflows/erlang.yml index 2841ddd..6bab87e 100644 --- a/.github/workflows/erlang.yml +++ b/.github/workflows/erlang.yml @@ -6,37 +6,37 @@ jobs: build: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 strategy: matrix: - otp: ['23.3', '24.0'] - rebar: ['3.16.1'] + otp: ['25', '26'] + rebar: ['3.22'] steps: - - uses: actions/checkout@v2 - - uses: erlef/setup-beam@v1 - id: setup-beam - with: - otp-version: ${{matrix.otp}} - rebar3-version: ${{matrix.rebar}} - - name: Restore _build - uses: actions/cache@v2 - with: - path: _build - key: _build-cache-for-os-${{runner.os}}-otp-${{steps.setup-beam.outputs.otp-version}}-rebar3-${{steps.setup-beam.outputs.rebar3-version}}-hash-${{hashFiles('rebar.lock')}} - - name: Restore rebar3's cache - uses: actions/cache@v2 - with: - path: ~/.cache/rebar3 - key: rebar3-cache-for-os-${{runner.os}}-otp-${{steps.setup-beam.outputs.otp-version}}-rebar3-${{steps.setup-beam.outputs.rebar3-version}}-hash-${{hashFiles('rebar.lock')}} - - name: Compile - run: rebar3 compile - - name: Format check - run: rebar3 format --verify - - name: Run tests and verifications - run: rebar3 test - - name: Upload code coverage - uses: codecov/codecov-action@v1 - with: - file: "_build/test/covertool/worker_pool.covertool.xml" + - uses: actions/checkout@v3 + - uses: erlef/setup-beam@v1 + id: setup-beam + with: + otp-version: ${{matrix.otp}} + rebar3-version: ${{matrix.rebar}} + - name: Restore _build + uses: actions/cache@v3 + with: + path: _build + key: _build-cache-for-os-${{runner.os}}-otp-${{steps.setup-beam.outputs.otp-version}}-rebar3-${{steps.setup-beam.outputs.rebar3-version}}-hash-${{hashFiles('rebar.lock')}} + - name: Restore rebar3's cache + uses: actions/cache@v3 + with: + path: ~/.cache/rebar3 + key: rebar3-cache-for-os-${{runner.os}}-otp-${{steps.setup-beam.outputs.otp-version}}-rebar3-${{steps.setup-beam.outputs.rebar3-version}}-hash-${{hashFiles('rebar.lock')}} + - name: Compile + run: rebar3 compile + - name: Format check + run: rebar3 format --verify + - name: Run tests and verifications + run: rebar3 test + - name: Upload code coverage + uses: codecov/codecov-action@v3 + with: + file: "_build/test/covertool/worker_pool.covertool.xml" diff --git a/.gitignore b/.gitignore index 4fbd7e1..f40a53d 100644 --- a/.gitignore +++ b/.gitignore @@ -1,14 +1,6 @@ rebar3.crashdump +.rebar3 doc/ -codecov.json -_build/ -all.coverdata -doc -.DS_Store +_* erl_crash.dump -*.beam -*.log -*~ -.idea -*.iml -*.orig +logs diff --git a/LICENSE b/LICENSE index dc3b9af..6fa7e10 100644 --- a/LICENSE +++ b/LICENSE @@ -1,7 +1,7 @@ Apache License Version 2.0, January 2004 - http://www.apache.org/licenses/ + https://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION @@ -193,7 +193,7 @@ you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + https://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/README.md b/README.md index 403d6e9..2d0ea69 100644 --- a/README.md +++ b/README.md @@ -1,47 +1,45 @@ -# Worker Pool [![Build Status](https://travis-ci.org/inaka/worker_pool.svg?branch=main)](https://travis-ci.org/inaka/worker_pool)[![codecov](https://codecov.io/gh/inaka/worker_pool/branch/main/graph/badge.svg)](https://codecov.io/gh/inaka/worker_pool) +# Worker Pool [![Build Status](https://github.com/inaka/worker_pool/actions/workflows/erlang.yml/badge.svg)](https://github.com/inaka/worker_pool/actions/workflows/erlang.yml)[![codecov](https://codecov.io/gh/inaka/worker_pool/branch/main/graph/badge.svg)](https://codecov.io/gh/inaka/worker_pool) - + A pool of gen servers. -### Abstract +## Abstract The goal of **worker pool** is pretty straightforward: To provide a transparent way to manage a pool of workers and _do the best effort_ in balancing the load among them distributing the tasks requested to the pool. -### Documentation +## Documentation -The documentation can be generated from code using [edoc](http://www.erlang.org/doc/apps/edoc/chapter.html) with ``rebar3 edoc`` or using [erldocs](https://github.com/erldocs/erldocs) with ``make erldocs``. It is also available online [here](https://hexdocs.pm/worker_pool/) +The documentation can be generated from code using [rebar3_ex_doc](https://github.com/starbelly/rebar3_ex_doc) with `rebar3 ex_doc`. It is also available online [here](https://hexdocs.pm/worker_pool/) -### Usage +## Usage All user functions are exposed through the [wpool module](https://hexdocs.pm/worker_pool/wpool.html). -#### Starting the Application -**Worker Pool** is an erlang application that can be started using the functions in the [`application`](http://erldocs.com/current/kernel/application.html) module. For convenience, `wpool:start/0` and `wpool:stop/0` are also provided. +### Starting the Application + +**Worker Pool** is an erlang application that can be started using the functions in the [`application`](https://erldocs.com/current/kernel/application.html) module. For convenience, `wpool:start/0` and `wpool:stop/0` are also provided. + +### Starting a Pool -#### Starting a Pool To start a new worker pool, you can either use `wpool:start_pool` (if you want to supervise it yourself) or `wpool:start_sup_pool` (if you want the pool to live under wpool's supervision tree). You can provide several options on any of those calls: * **overrun_warning**: The number of milliseconds after which a task is considered *overrun* (i.e. delayed) so a warning is emitted using **overrun_handler**. The task is monitored until it is finished, thus more than one warning might be emitted for a single task. The rounds of warnings are not equally timed, an exponential backoff algorithm is used instead: after each warning the overrun time is doubled (i.e. with `overrun_warning = 1000` warnings would be emitted after 1000, 2000, 4000, 8000 ...) The default value for this setting is `infinity` (i.e. no warnings are emitted) * **max_overrun_warnings**: The maximum number of overrun warnings emitted before killing a delayed task: that is, killing the worker running the task. If this parameter is set to a value other than `infinity` the rounds of warnings becomes equally timed (i.e. with `overrun_warning = 1000` and `max_overrun_warnings = 5` the task would be killed after 5 seconds of execution) The default value for this setting is `infinity` (i.e. delayed tasks are not killed) - **NOTE:** As the worker is being killed it might cause worker's messages to be missing if you are using a worker stategy other than `available_worker` (see worker strategies below) - * **overrun_handler**: The module and function to call when a task is *overrun*. The default value for this setting is `{error_logger, warning_report}`. Repor values are: - - * *{alert, AlertType}*: Where `AlertType` is `overrun` on regular warnings, or `max_overrun_limit` when the worker is about to be killed. - * *{pool, Pool}*: The poolname - * *{worker, Pid}*: Pid of the worker - * *{task, Task}*: A description of the task - * *{runtime, Runtime}*: The runtime of the current round - + * *{alert, AlertType}*: Where `AlertType` is `overrun` on regular warnings, or `max_overrun_limit` when the worker is about to be killed. + * *{pool, Pool}*: The poolname + * *{worker, Pid}*: Pid of the worker + * *{task, Task}*: A description of the task + * *{runtime, Runtime}*: The runtime of the current round * **workers**: The number of workers in the pool. The default value for this setting is `100` * **worker_type**: The type of the worker. The available values are `gen_server`. The default value is `gen_server`. Eventually we'll add `gen_statem` as well. -* **worker**: The [`gen_server`](http://erldocs.com/current/stdlib/gen_server.html) module that each worker will run and the `InitArgs` to use on the corresponding `start_link` call used to initiate it. The default value for this setting is `{wpool_worker, undefined}`. That means that if you don't provide a worker implementation, the pool will be generated with this default one. [`wpool_worker`](https://hexdocs.pm/worker_pool/wpool_worker.html) is a module that implements a very simple RPC-like interface. +* **worker**: The [`gen_server`](https://erldocs.com/current/stdlib/gen_server.html) module that each worker will run and the `InitArgs` to use on the corresponding `start_link` call used to initiate it. The default value for this setting is `{wpool_worker, undefined}`. That means that if you don't provide a worker implementation, the pool will be generated with this default one. [`wpool_worker`](https://hexdocs.pm/worker_pool/wpool_worker.html) is a module that implements a very simple RPC-like interface. * **worker_opt**: Options that will be passed to each `gen_server` worker. This are the same as described at `gen_server` documentation. * **worker_shutdown**: The `shutdown` option to be used in the child specs of the workers. Defaults to `5000`. * **strategy**: Not the worker selection strategy (discussed below) but the supervisor flags to be used in the supervisor over the individual workers (`wpool_process_sup`). Defaults to `{one_for_one, 5, 60}` @@ -53,34 +51,43 @@ To start a new worker pool, you can either use `wpool:start_pool` (if you want t * **callbacks**: Initial list of callback modules implementing `wpool_process_callbacks` to be called on certain worker events. This options will only work if the `enable_callbacks` is set to **true**. Callbacks can be added and removed later by `wpool_pool:add_callback_module/2` and `wpool_pool:remove_callback_module/2`. -#### Using the Workers +### Using the Workers + Since the workers are `gen_server`s, messages can be `call`ed or `cast`ed to them. To do that you can use `wpool:call` and `wpool:cast` as you would use the equivalent functions on `gen_server`. -##### Choosing a Strategy +#### Choosing a Strategy + Beyond the regular parameters for `gen_server`, wpool also provides an extra optional parameter: **Strategy**. -The strategy used to pick up the worker to perform the task. If not provided, the result of `wpool:default_strategy/0` is used. The available strategies are defined in the `wpool:strategy/0` type and also described below: +The strategy used to pick up the worker to perform the task. If not provided, the result of `wpool:default_strategy/0` is used. The available strategies are defined in the `t:wpool:strategy/0` type and also described below: + +##### best_worker + +Picks the worker with the smaller queue of messages. Loosely based on [this article](https://lethain.com/load-balancing-across-erlang-process-groups/). This strategy is usually useful when your workers always perform the same task, or tasks with expectedly similar runtimes. -###### best_worker -Picks the worker with the smaller queue of messages. Loosely based on [this article](http://lethain.com/load-balancing-across-erlang-process-groups/). This strategy is usually useful when your workers always perform the same task, or tasks with expectedly similar runtimes. +##### random_worker -###### random_worker Just picks a random worker. This strategy is the fastest one when to select a worker. It's ideal if your workers will perform many short tasks. -###### next_worker +##### next_worker + Picks the next worker in a round-robin fashion. That ensures evenly distribution of tasks. -###### available_worker +##### available_worker + Instead of just picking one of the workers in the queue and sending the request to it, this strategy queues the request and waits until a worker is available to perform it. That may render the worker selection part of the process much slower (thus generating the need for an additional parameter: **Worker_Timeout** that controls how many milliseconds is the client willing to spend in that, regardless of the global **Timeout** for the call). This strategy ensures that, if a worker crashes, no messages are lost in its message queue. It also ensures that, if a task takes too long, that doesn't block other tasks since, as soon as other worker is free it can pick up the next task in the list. -###### next_available_worker +##### next_available_worker + In a way, this strategy behaves like `available_worker` in the sense that it will pick the first worker that it can find which is not running any task at the moment, but the difference is that it will fail if all workers are busy. -###### hash_worker +##### hash_worker + This strategy takes a key and selects a worker using [`erlang:phash2/2`](https://www.erlang.org/doc/man/erlang.html#phash-2). This ensures that tasks classified under the same key will be delivered to the same worker, which is useful to classify events by key and work on them sequentially on the worker, distributing different keys across different workers. -#### Broadcasting a Pool +### Broadcasting a Pool + Wpool provides a way to `broadcast` a message to every worker within the given Pool. ```erlang @@ -97,27 +104,32 @@ ok **NOTE:** This messages don't get queued, they go straight to the worker's message queues, so if you're using available_worker strategy to balance the charge and you have some tasks queued up waiting for the next available worker, the broadcast will reach all the workers **before** the queued up tasks. -#### Watching a Pool +### Watching a Pool + Wpool provides a way to get live statistics about a pool. To do that, you can use `wpool:stats/1`. -#### Stopping a Pool -To stop a pool, just use `wpool:stop/1`. +### Stopping a Pool + +To stop a pool, just use `wpool:stop_pool/1`. -### Examples +## Examples To see how `wpool` is used you can check the [test](test) folder where you'll find many different scenarios exercised in the different suites. -If you want to see **worker_pool** in a _real life_ project, I recommend you to check [sumo_db](https://github.com/inaka/sumo_db), another open-source library from [Inaka](http://inaka.github.io/) that uses wpool intensively. +If you want to see **worker_pool** in a _real life_ project, I recommend you to check [sumo_db](https://github.com/inaka/sumo_db), another open-source library from [Inaka](https://inaka.github.io/) that uses wpool intensively. + +## Benchmarks -### Benchmarks +**wpool** comes with a very basic [benchmarker](https://github.com/inaka/worker_pool/blob/main/test/wpool_bench.erl) that let's you compare different strategies against the default `wpool_worker`. If you want to do the same in your project, you can use `wpool_bench` as a template and replace the worker and the tasks by your own ones. -**wpool** comes with a very basic [benchmarker](test/wpool_bench.erl) that let's you compare different strategies against the default `wpool_worker`. If you want to do the same in your project, you can use `wpool_bench` as a template and replace the worker and the tasks by your own ones. +## Contact Us -### Contact Us If you find any **bugs** or have a **problem** while using this library, please [open an issue](https://github.com/inaka/worker_pool/issues/new) in this repo (or a pull request :)). -### On Hex.pm +## On Hex.pm + Worker Pool is available on [Hex.pm](https://hex.pm/packages/worker_pool). -### Requirements -**Required OTP version 23** or or higher. We only provide guarantees that the system runs on `OTP23+` since that's what we're testing it in, but the `minimum_otp_vsn` is `"21"` because some systems where **worker_pool** is integrated do require it. +## Requirements + +**Required OTP version 25** or higher. We only provide guarantees that the system runs on `OTP25+` since that's what we're testing it in, but the `minimum_otp_vsn` is `"21"` because some systems where **worker_pool** is integrated do require it. diff --git a/priv/overview.edoc b/priv/overview.edoc deleted file mode 100644 index 004abb2..0000000 --- a/priv/overview.edoc +++ /dev/null @@ -1,6 +0,0 @@ -** this is the overview.doc file for Worker Pool ** - -@author Brujo -@title Worker Pool -@doc This project goal is simple: It's just a pool of Gen Servers. -@reference See our README for more information. diff --git a/rebar.config b/rebar.config index 72ba44d..4e2b1a0 100644 --- a/rebar.config +++ b/rebar.config @@ -1,87 +1,68 @@ -%% -*- mode: erlang;erlang-indent-level: 2;indent-tabs-mode: nil -*- -%% ex: ts=4 sw=4 ft=erlang et +%% == Compiler and Profiles == -%% == Erlang Compiler == -{minimum_otp_vsn, "21"}. - -%% Erlang compiler options {erl_opts, - [warn_unused_vars, - ewarn_export_all, - warn_shadow_vars, - warn_unused_import, - warn_unused_function, - warn_bif_clash, - warn_unused_record, - warn_deprecated_function, - warn_obsolete_guard, - strict_validation, - warn_export_vars, - warn_exported_vars, - warn_missing_spec, - warn_untyped_record, - debug_info]}. + [warn_unused_import, warn_export_vars, warnings_as_errors, verbose, report, debug_info]}. + +{minimum_otp_vsn, "21"}. {profiles, [{test, - [{deps, [{katana, "1.0.0"}, {mixer, "1.2.0", {pkg, inaka_mixer}}, {meck, "0.9.2"}]}]}]}. - -{ct_compile_opts, - [warn_unused_vars, - warn_export_all, - warn_shadow_vars, - warn_unused_import, - warn_unused_function, - warn_bif_clash, - warn_unused_record, - warn_deprecated_function, - warn_obsolete_guard, - strict_validation, - warn_export_vars, - warn_exported_vars, - warn_missing_spec, - warn_untyped_record, - debug_info]}. - -{ct_opts, []}. - -{ct_extra_params, - "-no_auto_compile -dir ebin -logdir log/ct --erl_args -smp enable -boot start_sasl"}. - -{edoc_opts, [{todo, true}, {overview, "priv/overview.edoc"}]}. - -{dialyzer, - [{warnings, - [race_conditions, unknown, no_return, unmatched_returns, error_handling, underspecs]}]}. - -{project_plugins, - [{rebar3_hex, "~> 6.11.7"}, - {rebar3_format, "~> 1.0.1"}, - {rebar3_lint, "~> 0.5.0"}, - {rebar3_hank, "~> 1.2.2"}, - rebar3_depup, - covertool]}. - -{cover_enabled, true}. - -{cover_export_enabled, true}. - -{cover_opts, [verbose]}. - -{covertool, [{coverdata_files, ["ct.coverdata"]}]}. + [{ct_extra_params, + "-no_auto_compile -dir ebin -logdir log/ct --erl_args -smp enable -boot start_sasl"}, + {cover_enabled, true}, + {cover_export_enabled, true}, + {cover_opts, [verbose]}, + {ct_opts, [{verbose, true}]}, + {deps, [{katana, "1.0.0"}, {mixer, "1.2.0", {pkg, inaka_mixer}}, {meck, "0.9.2"}]}, + {dialyzer, + [{warnings, [no_return, unmatched_returns, error_handling, underspecs, unknown]}, + {plt_extra_apps, [common_test, katana, meck]}]}]}]}. {alias, [{test, [compile, format, - lint, hank, + lint, + xref, dialyzer, - {ct, "--verbose"}, + ct, cover, {covertool, "generate"}, - edoc]}]}. + ex_doc]}]}. + +{covertool, [{coverdata_files, ["ct.coverdata"]}]}. + +%% == Dependencies and plugins == + +{project_plugins, + [{rebar3_hank, "~> 1.4.0"}, + {rebar3_hex, "~> 7.0.7"}, + {rebar3_format, "~> 1.3.0"}, + {rebar3_lint, "~> 3.0.1"}, + {rebar3_ex_doc, "0.2.18"}, + {rebar3_depup, "~> 0.3.1"}, + {covertool, "~> 2.0.6"}]}. + +%% == Documentation == + +{ex_doc, + [{source_url, <<"https://github.com/inaka/worker_pool">>}, + {extras, [<<"README.md">>, <<"LICENSE">>]}, + {main, <<"readme">>}]}. + +{hex, [{doc, #{provider => ex_doc}}]}. + +%% == Format == {format, [{files, ["*.config", "src/*", "test/*"]}]}. -{hex, [{doc, #{provider => edoc}}]}. +%% == Dialyzer + XRef == + +{dialyzer, + [{warnings, [no_return, unmatched_returns, error_handling, underspecs, unknown]}]}. + +{xref_checks, + [undefined_function_calls, deprecated_function_calls, deprecated_functions]}. + +{xref_extra_paths, ["test/**"]}. diff --git a/src/worker_pool.app.src b/src/worker_pool.app.src index 8001f59..b752e7e 100644 --- a/src/worker_pool.app.src +++ b/src/worker_pool.app.src @@ -5,7 +5,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -26,7 +26,7 @@ {env, []}, {licenses, ["Apache2"]}, {links, - [{"Github", "https://github.com/inaka/worker_pool"}, + [{"GitHub", "https://github.com/inaka/worker_pool"}, {"Blog Post", - "https://web.archive.org/web/20170602054156/http://inaka.net/blog/2014/09/25/worker-pool/"}]}, + "https://web.archive.org/web/20170602054156/https://inaka.net/blog/2014/09/25/worker-pool/"}]}, {build_tools, ["rebar3"]}]}. diff --git a/src/wpool.erl b/src/wpool.erl index 943e935..662054c 100644 --- a/src/wpool.erl +++ b/src/wpool.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -37,7 +37,9 @@ {pool_sup_intensity, non_neg_integer()} | {pool_sup_shutdown, brutal_kill | timeout()} | {pool_sup_period, non_neg_integer()} | - {queue_type, wpool_queue_manager:queue_type()}. + {queue_type, wpool_queue_manager:queue_type()} | + {enable_callbacks, boolean()} | + {callbacks, [module()]}. -type custom_strategy() :: fun(([atom()]) -> Atom :: atom()). -type strategy() :: best_worker | @@ -125,7 +127,7 @@ stop_pool(Name) -> start_sup_pool(Name) -> start_sup_pool(Name, []). -%% @doc Starts a pool of N wpool_processes supervised by {@link wpool_sup} +%% @doc Starts a pool of N wpool_processes supervised by `wpool_sup' -spec start_sup_pool(name(), [option()]) -> {ok, pid()} | {error, {already_started, pid()} | term()}. start_sup_pool(Name, Options) -> @@ -192,19 +194,20 @@ cast(Sup, Cast, Strategy) -> wpool_pool:Strategy(Sup), Cast). %% @equiv send_request(Sup, Call, default_strategy(), 5000) --spec send_request(name(), term()) -> noproc | timeout | reference(). +-spec send_request(name(), term()) -> noproc | timeout | gen_server:request_id(). send_request(Sup, Call) -> send_request(Sup, Call, default_strategy()). %% @equiv send_request(Sup, Call, Strategy, 5000) --spec send_request(name(), term(), strategy()) -> noproc | timeout | reference(). +-spec send_request(name(), term(), strategy()) -> + noproc | timeout | gen_server:request_id(). send_request(Sup, Call, Strategy) -> send_request(Sup, Call, Strategy, 5000). %% @doc Picks a server and issues the call to it. %% Timeout applies only for the time used choosing a worker in the available_worker strategy -spec send_request(name(), term(), strategy(), timeout()) -> - noproc | timeout | reference(). + noproc | timeout | gen_server:request_id(). send_request(Sup, Call, available_worker, Timeout) -> wpool_pool:send_request_available_worker(Sup, Call, Timeout); send_request(Sup, Call, {hash_worker, HashKey}, _Timeout) -> diff --git a/src/wpool_pool.erl b/src/wpool_pool.erl index 03633ad..0f494e9 100644 --- a/src/wpool_pool.erl +++ b/src/wpool_pool.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -47,7 +47,7 @@ %% API functions %% =================================================================== -%% @doc Starts a supervisor with several {@link wpool_process}es as its children +%% @doc Starts a supervisor with several `wpool_process'es as its children -spec start_link(wpool:name(), [wpool:option()]) -> {ok, pid()} | {error, {already_started, pid()} | term()}. start_link(Name, Options) -> @@ -124,7 +124,7 @@ call_available_worker(Name, Call, Timeout) -> %% @doc Picks the first available worker and sends the request to it. %% The timeout provided considers only the time it takes to get a worker -spec send_request_available_worker(wpool:name(), any(), timeout()) -> - noproc | timeout | reference(). + noproc | timeout | gen_server:request_id(). send_request_available_worker(Name, Call, Timeout) -> wpool_queue_manager:send_request_available_worker(queue_manager_name(Name), Call, @@ -245,9 +245,9 @@ task({_TaskId, Started, Task}) -> %% @doc Set next within the worker pool record. Useful when using %% a custom strategy function. -spec next(pos_integer(), wpool()) -> wpool(). -next(Next, WPool = #wpool{next = Atomic}) -> +next(Next, #wpool{next = Atomic} = Wpool) -> atomics:put(Atomic, 1, Next), - WPool. + Wpool. %% @doc Adds a callback module. %% The module must implement the
wpool_process_callbacks
behaviour. @@ -266,10 +266,10 @@ remove_callback_module(Pool, Module) -> %% strategy function. -spec wpool_get(atom(), wpool()) -> any(); ([atom()], wpool()) -> any(). -wpool_get(List, WPool) when is_list(List) -> - [g(Atom, WPool) || Atom <- List]; -wpool_get(Atom, WPool) when is_atom(Atom) -> - g(Atom, WPool). +wpool_get(List, Wpool) when is_list(List) -> + [g(Atom, Wpool) || Atom <- List]; +wpool_get(Atom, Wpool) when is_atom(Atom) -> + g(Atom, Wpool). g(name, #wpool{name = Ret}) -> Ret; @@ -455,15 +455,15 @@ store_wpool(Name, Size, Options) -> Atomic = atomics:new(1, [{signed, false}]), atomics:put(Atomic, 1, 1), WorkerNames = list_to_tuple([worker_name(Name, I) || I <- lists:seq(1, Size)]), - WPool = + Wpool = #wpool{name = Name, size = Size, next = Atomic, workers = WorkerNames, opts = Options, qmanager = queue_manager_name(Name)}, - persistent_term:put({?MODULE, Name}, WPool), - WPool. + persistent_term:put({?MODULE, Name}, Wpool), + Wpool. %% @doc Use this function to get the Worker pool record in a custom worker. -spec find_wpool(atom()) -> undefined | wpool(). @@ -471,8 +471,8 @@ find_wpool(Name) -> try {erlang:whereis(Name), persistent_term:get({?MODULE, Name})} of {undefined, _} -> undefined; - {_, WPool} -> - WPool + {_, Wpool} -> + Wpool catch _:badarg -> build_wpool(Name) diff --git a/src/wpool_process.erl b/src/wpool_process.erl index 7e8a8bf..4a88ab7 100644 --- a/src/wpool_process.erl +++ b/src/wpool_process.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -12,7 +12,7 @@ % specific language governing permissions and limitations % under the License. %%% @author Fernando Benavides -%%% @doc Decorator over {@link gen_server} that lets {@link wpool_pool} +%%% @doc Decorator over `gen_server' that lets `wpool_pool' %%% control certain aspects of the execution -module(wpool_process). @@ -28,11 +28,22 @@ overrun_warning := timeout(), _ => _}}). --type state() :: #state{}. +-opaque state() :: #state{}. + +-export_type([state/0]). + -type from() :: {pid(), reference()}. + +-export_type([from/0]). + -type next_step() :: timeout() | hibernate | {continue, term()}. + +-export_type([next_step/0]). + -type options() :: [{time_checker | queue_manager, atom()} | wpool:option()]. +-export_type([options/0]). + %% api -export([start_link/4, call/3, cast/2, send_request/2]). %% gen_server callbacks diff --git a/src/wpool_process_callbacks.erl b/src/wpool_process_callbacks.erl index 6f59897..cd1901b 100644 --- a/src/wpool_process_callbacks.erl +++ b/src/wpool_process_callbacks.erl @@ -9,8 +9,13 @@ -export([notify/3, add_callback_module/2, remove_callback_module/2]). -type state() :: module(). + +-export_type([state/0]). + -type event() :: handle_init_start | handle_worker_creation | handle_worker_death. +-export_type([event/0]). + -callback handle_init_start(wpool:name()) -> any(). -callback handle_worker_creation(wpool:name()) -> any(). -callback handle_worker_death(wpool:name(), term()) -> any(). diff --git a/src/wpool_process_sup.erl b/src/wpool_process_sup.erl index 24b58ef..f0b2a23 100644 --- a/src/wpool_process_sup.erl +++ b/src/wpool_process_sup.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -11,7 +11,7 @@ % KIND, either express or implied. See the License for the % specific language governing permissions and limitations % under the License. -%%% @hidden +%%% @private -module(wpool_process_sup). -behaviour(supervisor). @@ -33,14 +33,8 @@ init({Name, Options}) -> Workers = proplists:get_value(workers, Options, 100), Strategy = proplists:get_value(strategy, Options, {one_for_one, 5, 60}), maybe_add_event_handler(Options), - {WorkerType, Worker, InitArgs} = - case proplists:get_value(worker_type, Options, gen_server) of - gen_server -> - {W, IA} = proplists:get_value(worker, Options, {wpool_worker, undefined}), - {wpool_process, W, IA} - end, - %% We'll eventually add more types (like gen_statem), - %% that's why this case remains + {W, IA} = proplists:get_value(worker, Options, {wpool_worker, undefined}), + {WorkerType, Worker, InitArgs} = {wpool_process, W, IA}, WorkerShutdown = proplists:get_value(worker_shutdown, Options, 5000), WorkerSpecs = [{wpool_pool:worker_name(Name, I), diff --git a/src/wpool_queue_manager.erl b/src/wpool_queue_manager.erl index add2e12..69074fd 100644 --- a/src/wpool_queue_manager.erl +++ b/src/wpool_queue_manager.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -11,7 +11,7 @@ % KIND, either express or implied. See the License for the % specific language governing permissions and limitations % under the License. -%%% @hidden +%%% @private -module(wpool_queue_manager). -behaviour(gen_server). @@ -30,28 +30,44 @@ monitors :: #{atom() := monitored_from()}, queue_type :: queue_type()}). --type state() :: #state{}. --type from() :: {pid(), reference()}. +-opaque state() :: #state{}. + +-export_type([state/0]). + +-type from() :: {pid(), gen_server:reply_tag()}. + +-export_type([from/0]). + -type monitored_from() :: {reference(), from()}. -type options() :: [{option(), term()}]. + +-export_type([options/0]). + -type option() :: queue_type. -type args() :: [{arg(), term()}]. + +-export_type([args/0]). + -type arg() :: option() | pool. -type queue_mgr() :: atom(). -type queue_type() :: fifo | lifo. +-type worker_event() :: new_worker | worker_dead | worker_busy | worker_ready. + +-export_type([worker_event/0]). + +-type call_request() :: {available_worker, infinity | pos_integer()} | pending_task_count. +-export_type([call_request/0]). -export_type([queue_mgr/0, queue_type/0]). %%%=================================================================== %%% API %%%=================================================================== -%% @equiv start_link(WPool, Name, []) -spec start_link(wpool:name(), queue_mgr()) -> {ok, pid()} | {error, {already_started, pid()} | term()}. start_link(WPool, Name) -> start_link(WPool, Name, []). -%% @private -spec start_link(wpool:name(), queue_mgr(), options()) -> {ok, pid()} | {error, {already_started, pid()} | term()}. start_link(WPool, Name, Options) -> @@ -80,7 +96,7 @@ cast_to_available_worker(QueueManager, Cast) -> %% @doc returns the first available worker in the pool -spec send_request_available_worker(queue_mgr(), any(), timeout()) -> - noproc | timeout | reference(). + noproc | timeout | gen_server:request_id(). send_request_available_worker(QueueManager, Call, Timeout) -> case get_available_worker(QueueManager, Call, Timeout) of {ok, _TimeLeft, Worker} -> @@ -118,7 +134,6 @@ pending_task_count(QueueManager) -> %%%=================================================================== %%% gen_server callbacks %%%=================================================================== -%% @private -spec init(args()) -> {ok, state()}. init(Args) -> WPool = proplists:get_value(pool, Args), @@ -131,9 +146,6 @@ init(Args) -> monitors = #{}, queue_type = QueueType}}. --type worker_event() :: new_worker | worker_dead | worker_busy | worker_ready. - -%% @private -spec handle_cast({worker_event(), atom()}, state()) -> {noreply, state()}. handle_cast({new_worker, Worker}, State) -> handle_cast({worker_ready, Worker}, State); @@ -187,12 +199,9 @@ handle_cast({cast_to_available_worker, Cast}, State) -> {noreply, State#state{workers = NewWorkers}} end. --type call_request() :: {available_worker, infinity | pos_integer()} | pending_task_count. - -%% @private -spec handle_call(call_request(), from(), state()) -> {reply, {ok, atom()}, state()} | {noreply, state()}. -handle_call({available_worker, ExpiresAt}, Client = {ClientPid, _Ref}, State) -> +handle_call({available_worker, ExpiresAt}, {ClientPid, _Ref} = Client, State) -> #state{workers = Workers, clients = Clients} = State, case gb_sets:is_empty(Workers) of true -> @@ -212,11 +221,10 @@ handle_call({available_worker, ExpiresAt}, Client = {ClientPid, _Ref}, State) -> handle_call(pending_task_count, _From, State) -> {reply, get(pending_tasks), State}. -%% @private -spec handle_info(any(), state()) -> {noreply, state()}. handle_info({'DOWN', Ref, Type, {Worker, _Node}, Exit}, State) -> handle_info({'DOWN', Ref, Type, Worker, Exit}, State); -handle_info({'DOWN', _, _, Worker, Exit}, State = #state{monitors = Mons}) -> +handle_info({'DOWN', _, _, Worker, Exit}, #state{monitors = Mons} = State) -> case Mons of #{Worker := {_Ref, Client}} -> gen_server:reply(Client, {'EXIT', Worker, Exit}), @@ -282,7 +290,7 @@ is_expired(ExpiresAt) -> now_in_milliseconds() -> erlang:system_time(millisecond). -monitor_worker(Worker, Client, State = #state{monitors = Mons}) -> +monitor_worker(Worker, Client, #state{monitors = Mons} = State) -> Ref = monitor(process, Worker), State#state{monitors = maps:put(Worker, {Ref, Client}, Mons)}. diff --git a/src/wpool_sup.erl b/src/wpool_sup.erl index 2c4baae..4143943 100644 --- a/src/wpool_sup.erl +++ b/src/wpool_sup.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -11,7 +11,7 @@ % KIND, either express or implied. See the License for the % specific language governing permissions and limitations % under the License. -%%% @hidden +%%% @private -module(wpool_sup). -behaviour(supervisor). @@ -47,7 +47,6 @@ stop_pool(Name) -> %%---------------------------------------------------------------------- %% Supervisor behaviour callbacks %%---------------------------------------------------------------------- -%% @hidden -spec init([]) -> {ok, {{simple_one_for_one, 5, 60}, [supervisor:child_spec()]}}. init([]) -> {ok, diff --git a/src/wpool_time_checker.erl b/src/wpool_time_checker.erl index cf65c88..b8d2bee 100644 --- a/src/wpool_time_checker.erl +++ b/src/wpool_time_checker.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -11,22 +11,32 @@ % KIND, either express or implied. See the License for the % specific language governing permissions and limitations % under the License. -%%% @hidden +%%% @private -module(wpool_time_checker). -behaviour(gen_server). -type handler() :: {atom(), atom()}. +-export_type([handler/0]). + -record(state, {wpool :: wpool:name(), handlers :: [handler()]}). --type state() :: #state{}. +-opaque state() :: #state{}. + +-export_type([state/0]). + +-type from() :: {pid(), reference()}. + +-export_type([from/0]). %% api -export([start_link/3, add_handler/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2]). +-elvis([{elvis_style, no_catch_expressions, disable}]). + %%%=================================================================== %%% API %%%=================================================================== @@ -56,10 +66,8 @@ init({WPool, Handlers}) -> handle_cast(_Cast, State) -> {noreply, State}. --type from() :: {pid(), reference()}. - -spec handle_call({add_handler, handler()}, from(), state()) -> {reply, ok, state()}. -handle_call({add_handler, Handler}, _, State = #state{handlers = Handlers}) -> +handle_call({add_handler, Handler}, _, #state{handlers = Handlers} = State) -> {reply, ok, State#state{handlers = [Handler | Handlers]}}. %%%=================================================================== diff --git a/src/wpool_utils.erl b/src/wpool_utils.erl index b9ce477..9c4c623 100644 --- a/src/wpool_utils.erl +++ b/src/wpool_utils.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -15,8 +15,6 @@ %%% @doc Common functions for wpool_process and other modules. -module(wpool_utils). --author('ferigis@gmail.com'). - %% API -export([task_init/2, task_end/1, add_defaults/1]). diff --git a/src/wpool_worker.erl b/src/wpool_worker.erl index 044f6ea..eb8f909 100644 --- a/src/wpool_worker.erl +++ b/src/wpool_worker.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -12,7 +12,7 @@ % specific language governing permissions and limitations % under the License. %%% @author Fernando Benavides -%%% @doc Default instance for {@link wpool_process} +%%% @doc Default instance for `wpool_process' -module(wpool_worker). -behaviour(gen_server). @@ -22,6 +22,16 @@ %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2]). +-record(state, {}). + +-opaque state() :: #state{}. + +-export_type([state/0]). + +-type from() :: {pid(), reference()}. + +-export_type([from/0]). + %%%=================================================================== %%% API %%%=================================================================== @@ -44,10 +54,6 @@ cast(S, M, F, A) -> %%% simple callbacks %%%=================================================================== --record(state, {}). - --type state() :: #state{}. - %% @private -spec init(undefined) -> {ok, state()}. init(undefined) -> @@ -71,8 +77,6 @@ handle_cast(Cast, State) -> error_logger:error_msg("Invalid cast:~p", [Cast]), {noreply, State, hibernate}. --type from() :: {pid(), reference()}. - %% @private -spec handle_call(term(), from(), state()) -> {reply, {ok, term()} | {error, term()}, state(), hibernate}. diff --git a/test/crashy_server.erl b/test/crashy_server.erl index e58a658..e3c30e0 100644 --- a/test/crashy_server.erl +++ b/test/crashy_server.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -22,10 +22,14 @@ -dialyzer([no_behaviours]). +-type from() :: {pid(), reference()}. + +-export_type([from/0]). + %%%=================================================================== %%% callbacks %%%=================================================================== --spec init(Something) -> Something. +-spec init(Something) -> {ok, Something}. init(Something) -> {ok, Something}. @@ -49,8 +53,6 @@ handle_cast(crash, _State) -> handle_cast(Cast, _State) -> Cast. --type from() :: {pid(), reference()}. - -spec handle_call(state | Call, from(), State) -> {reply, State, State} | Call. handle_call(state, _From, State) -> {reply, State, State}; diff --git a/test/echo_server.erl b/test/echo_server.erl index d748ff8..09a4405 100644 --- a/test/echo_server.erl +++ b/test/echo_server.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -22,6 +22,10 @@ -dialyzer([no_behaviours]). +-type from() :: {pid(), reference()}. + +-export_type([from/0]). + %%%=================================================================== %%% callbacks %%%=================================================================== @@ -47,8 +51,6 @@ handle_info(Info, _State) -> handle_cast(Cast, _State) -> Cast. --type from() :: {pid(), reference()}. - -spec handle_call(Call, from(), term()) -> Call. handle_call(Call, _From, _State) -> Call. diff --git a/test/sleepy_server.erl b/test/sleepy_server.erl index 3f7014f..8a5993e 100644 --- a/test/sleepy_server.erl +++ b/test/sleepy_server.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -21,6 +21,10 @@ -dialyzer([no_behaviours]). +-type from() :: {pid(), reference()}. + +-export_type([from/0]). + %%%=================================================================== %%% callbacks %%%=================================================================== @@ -36,8 +40,6 @@ handle_cast(TimeToSleep, State) -> _ = timer:sleep(TimeToSleep), {noreply, State}. --type from() :: {pid(), reference()}. - -spec handle_call(pos_integer(), from(), State) -> {reply, ok, State}. handle_call(TimeToSleep, _From, State) -> _ = timer:sleep(TimeToSleep), diff --git a/test/wpool_SUITE.erl b/test/wpool_SUITE.erl index f52894a..c7a8eea 100644 --- a/test/wpool_SUITE.erl +++ b/test/wpool_SUITE.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -21,6 +21,8 @@ -type config() :: [{atom(), term()}]. +-export_type([config/0]). + -export([all/0]). -export([init_per_suite/1, end_per_suite/1]). -export([stats/1, stop_pool/1, non_brutal_shutdown/1, brutal_worker_shutdown/1, overrun/1, @@ -28,6 +30,10 @@ overrun_handler2/1, default_options/1, complete_coverage/1, broadcast/1, send_request/1, worker_killed_stats/1]). +-elvis([{elvis_style, no_block_expressions, disable}]). + +-dialyzer({no_underspecs, all/0}). + -spec all() -> [atom()]. all() -> [too_much_overrun, @@ -54,11 +60,11 @@ end_per_suite(Config) -> wpool:stop(), Config. --spec overrun_handler1(M) -> M. +-spec overrun_handler1(M) -> {overrun1, M}. overrun_handler1(M) -> overrun_handler ! {overrun1, M}. --spec overrun_handler2(M) -> M. +-spec overrun_handler2(M) -> {overrun2, M}. overrun_handler2(M) -> overrun_handler ! {overrun2, M}. diff --git a/test/wpool_pool_SUITE.erl b/test/wpool_pool_SUITE.erl index de45dc3..08ca8e5 100644 --- a/test/wpool_pool_SUITE.erl +++ b/test/wpool_pool_SUITE.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -19,6 +19,8 @@ -type config() :: [{atom(), term()}]. +-export_type([config/0]). + -define(WORKERS, 6). -export([all/0]). @@ -28,6 +30,9 @@ queue_type_fifo/1, queue_type_lifo/1, get_workers/1]). -export([manager_crash/1, super_fast/1, mess_up_with_store/1]). +-elvis([{elvis_style, no_block_expressions, disable}]). +-elvis([{elvis_style, no_catch_expressions, disable}]). + -spec all() -> [atom()]. all() -> [Fun diff --git a/test/wpool_process_SUITE.erl b/test/wpool_process_SUITE.erl index 5acc37f..36f7a0d 100644 --- a/test/wpool_process_SUITE.erl +++ b/test/wpool_process_SUITE.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -19,6 +19,8 @@ -type config() :: [{atom(), term()}]. +-export_type([config/0]). + -export([all/0]). -export([init_per_suite/1, end_per_suite/1, init_per_testcase/2, end_per_testcase/2]). -export([init/1, init_timeout/1, info/1, cast/1, send_request/1, call/1, continue/1, diff --git a/test/wpool_process_callbacks_SUITE.erl b/test/wpool_process_callbacks_SUITE.erl index 6f0a872..01f1eac 100644 --- a/test/wpool_process_callbacks_SUITE.erl +++ b/test/wpool_process_callbacks_SUITE.erl @@ -4,6 +4,8 @@ -type config() :: [{atom(), term()}]. +-export_type([config/0]). + -export([all/0]). -export([init_per_suite/1, end_per_suite/1]). -export([complete_callback_passed_when_starting_pool/1, @@ -12,6 +14,8 @@ crashing_callback_does_not_affect_others/1, non_existsing_module_does_not_affect_others/1, complete_coverage/1]). +-dialyzer({no_underspecs, all/0}). + -spec all() -> [atom()]. all() -> [complete_callback_passed_when_starting_pool, diff --git a/test/wpool_worker_SUITE.erl b/test/wpool_worker_SUITE.erl index 527bf85..20a1d16 100644 --- a/test/wpool_worker_SUITE.erl +++ b/test/wpool_worker_SUITE.erl @@ -3,7 +3,7 @@ % except in compliance with the License. You may obtain % a copy of the License at % -% http://www.apache.org/licenses/LICENSE-2.0 +% https://www.apache.org/licenses/LICENSE-2.0 % % Unless required by applicable law or agreed to in writing, % software distributed under the License is distributed on an @@ -19,6 +19,8 @@ -type config() :: [{atom(), term()}]. +-export_type([config/0]). + -export([all/0]). -export([init_per_suite/1, end_per_suite/1]). -export([call/1, cast/1, complete_coverage/1]).