diff --git a/CHANGELOG.md b/CHANGELOG.md index 19ffba1..bda9ccc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,17 @@ All notable changes to the LaunchDarkly Erlang/Elixir SDK will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org). +## [1.0.0-alpha4] - 2019-12-19 + +### Added + +- Support for experimentation features. See `eld:track_with_metric/4-5`. + +### Fixed + +- Custom URI configuration is now consistent with other SDKs +- Bucketing logic for custom non-string attributes is brought in line with the other SDKs + ## [1.0.0-alpha3] - 2019-10-29 ### Fixed diff --git a/priv/flags-segments-put-data-another1.json b/priv/flags-segments-put-data-another1.json index 907f6cd..2a4d394 100644 --- a/priv/flags-segments-put-data-another1.json +++ b/priv/flags-segments-put-data-another1.json @@ -18,6 +18,7 @@ "sel": "8b4d79c59adb4df492ebea0bf65dfd4d", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -53,6 +54,7 @@ } ], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -83,6 +85,7 @@ } ], "id": "ab4a9fb3-7e85-429f-8078-23aa70094540", + "trackEvents": false, "variation": 1 }, { @@ -97,6 +100,7 @@ } ], "id": "489a185d-caaf-4db9-b192-e09e927d070c", + "trackEvents": false, "variation": 1 } ], @@ -104,6 +108,7 @@ "sel": "45501b9314dc4641841af774cb038b96", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false diff --git a/priv/flags-segments-put-data.json b/priv/flags-segments-put-data.json index a75c36b..b9ea2f2 100644 --- a/priv/flags-segments-put-data.json +++ b/priv/flags-segments-put-data.json @@ -18,6 +18,7 @@ "sel": "8b4d79c59adb4df492ebea0bf65dfd4d", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -40,6 +41,7 @@ "sel": "8b4d79c59adb4df492ebea0bf65dfd4d", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -62,6 +64,7 @@ "sel": "0c7fec46297e468089792dd2f5793ed8", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -84,6 +87,7 @@ "sel": "8b4d79c59adb4df492ebea0bf65dfd4d", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -111,6 +115,7 @@ "sel": "45501b9314dc4641841af774cb038b96", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -138,6 +143,7 @@ "sel": "45501b9314dc4641841af774cb038b96", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -165,6 +171,7 @@ "sel": "45501b9314dc4641841af774cb038b96", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -192,6 +199,7 @@ "sel": "8b4d79c59adb4df492ebea0bf65dfd4d", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -214,6 +222,7 @@ "sel": "8b4d79c59adb4df492ebea0bf65dfd4d", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -236,6 +245,7 @@ "sel": "8b4d79c59adb4df492ebea0bf65dfd4d", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -267,6 +277,7 @@ "sel": "45501b9314dc4641841af774cb038b96", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -303,6 +314,7 @@ } ], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -333,6 +345,7 @@ } ], "id": "ab4a9fb3-7e85-429f-8078-23aa70094540", + "trackEvents": false, "variation": 1 }, { @@ -347,6 +360,7 @@ } ], "id": "489a185d-caaf-4db9-b192-e09e927d070c", + "trackEvents": false, "variation": 1 } ], @@ -354,6 +368,7 @@ "sel": "45501b9314dc4641841af774cb038b96", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false @@ -384,6 +399,7 @@ } ], "id": "08b9b261-5df6-4881-892b-e25bdb28b6d3", + "trackEvents": false, "variation": 0 }, { @@ -398,6 +414,7 @@ } ], "id": "2fac50d0-d912-424a-831e-ab60ad0547b4", + "trackEvents": false, "variation": 1 }, { @@ -412,6 +429,7 @@ } ], "id": "e3b70ddf-a000-4649-93c5-ac0eaea675f8", + "trackEvents": false, "variation": 2 }, { @@ -426,6 +444,7 @@ } ], "id": "1d63c99a-3016-4778-bf1f-68d1fce5004e", + "trackEvents": false, "variation": 3 }, { @@ -440,6 +459,7 @@ } ], "id": "1f1dadfc-0e66-42e0-b479-979186d972ce", + "trackEvents": false, "variation": 4 }, { @@ -454,6 +474,7 @@ } ], "id": "ca092500-1cb7-4b14-a11c-81b46ca19cae", + "trackEvents": false, "variation": 5 }, { @@ -468,6 +489,7 @@ } ], "id": "d38e11f8-93d1-453e-8022-6d8ed7f106ea", + "trackEvents": false, "variation": 6 }, { @@ -482,6 +504,7 @@ } ], "id": "a92a93c2-2004-482b-9e4a-38abe81d7050", + "trackEvents": false, "variation": 7 }, { @@ -496,6 +519,7 @@ } ], "id": "9158e01a-a70f-4924-8cf8-9401e2cf6c67", + "trackEvents": false, "variation": 8 }, { @@ -510,6 +534,7 @@ } ], "id": "500633a7-2c82-4baf-8201-4892b68b31b4", + "trackEvents": false, "variation": 9 }, { @@ -524,6 +549,7 @@ } ], "id": "77473bea-d93f-4787-84d2-92cf08b35f2b", + "trackEvents": false, "variation": 10 }, { @@ -538,6 +564,7 @@ } ], "id": "9398cafc-0ab7-4d0d-8e01-6683cc4d17ec", + "trackEvents": false, "variation": 11 }, { @@ -552,6 +579,7 @@ } ], "id": "3570714f-d03b-4068-ab79-18f15c74382d", + "trackEvents": false, "variation": 12 }, { @@ -566,6 +594,7 @@ } ], "id": "2c002923-2db0-4fcc-a95e-f3cb5b4bd13d", + "trackEvents": false, "variation": 13 }, { @@ -580,6 +609,7 @@ } ], "id": "b6c5ceec-364d-4c23-a041-7865f4f136d3", + "trackEvents": false, "variation": 14 }, { @@ -594,6 +624,7 @@ } ], "id": "764c5346-6478-4d34-83e7-59c0afc7a15b", + "trackEvents": false, "variation": 15 }, { @@ -608,6 +639,7 @@ } ], "id": "726cb09a-2ecb-48b8-b3a9-24163f40163f", + "trackEvents": false, "variation": 16 } ], @@ -615,6 +647,7 @@ "sel": "45501b9314dc4641841af774cb038b96", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ "a", "b", @@ -675,6 +708,7 @@ "sel": "8b4d79c59adb4df492ebea0bf65dfd4d", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ "a", "b", @@ -724,6 +758,7 @@ "sel": "c521521b61774e078b48d3d477bd2f22", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ "a", "b", @@ -757,6 +792,7 @@ } ], "id": "ab4a9fb3-7e85-429f-8078-23aa70094540", + "trackEvents": false, "variation": 1 } ], @@ -764,6 +800,7 @@ "sel": "8b4d79c59adb4df492ebea0bf65dfd4d", "targets": [], "trackEvents": true, + "trackEventsFallthrough": false, "variations": [ true, false diff --git a/relx.config b/relx.config index 64f5f04..0fc07d0 100644 --- a/relx.config +++ b/relx.config @@ -1,2 +1,2 @@ -{release, {eld, "1.0.0-alpha3"}, [eld]}. +{release, {eld, "1.0.0-alpha4"}, [eld]}. {extended_start_script, true}. diff --git a/src/eld.app.src b/src/eld.app.src index 1b89e6b..6da5ff3 100644 --- a/src/eld.app.src +++ b/src/eld.app.src @@ -1,6 +1,6 @@ {application, eld, [{description, "LaunchDarkly Erlang SDK"}, - {vsn, "1.0.0-alpha3"}, + {vsn, "1.0.0-alpha4"}, {registered, []}, {mod, {eld_app, []}}, {applications, diff --git a/src/eld.erl b/src/eld.erl index 395fe0b..9d10b11 100644 --- a/src/eld.erl +++ b/src/eld.erl @@ -25,6 +25,8 @@ -export([identify/2]). -export([track/3]). -export([track/4]). +-export([track_with_metric/4]). +-export([track_with_metric/5]). %% Types -type feature_flags_state() :: #{ @@ -204,3 +206,26 @@ track(Key, User, Data) when is_binary(Key), is_map(Data) -> track(Key, User, Data, Tag) when is_atom(Tag), is_binary(Key), is_map(Data) -> Event = eld_event:new_custom(Key, User, Data), eld_event_server:add_event(Tag, Event, #{}). + +%% @doc Reports that a user has performed an event, and associates it with a numeric value. +%% +%% This value is used by the LaunchDarkly experimentation feature in numeric custom metrics, and will also +%% be returned as part of the custom event for Data Export. +%% +%% Custom data can also be attached to the event. +%% @end +-spec track_with_metric(Key :: binary(), User :: eld_user:user(), Data :: map(), Metric :: number()) -> ok. +track_with_metric(Key, User, Data, Metric) -> + track_with_metric(Key, User, Data, Metric, ?DEFAULT_INSTANCE_NAME). + +%% @doc Reports that a user has performed an event, and associates it with a numeric value. +%% +%% This value is used by the LaunchDarkly experimentation feature in numeric custom metrics, and will also +%% be returned as part of the custom event for Data Export. +%% +%% Custom data can also be attached to the event. +%% @end +-spec track_with_metric(Key :: binary(), User :: eld_user:user(), Data :: map(), Metric :: number(), Tag :: atom()) -> ok. +track_with_metric(Key, User, Data, Metric, Tag) -> + Event = eld_event:new_custom(Key, User, Data, Metric), + eld_event_server:add_event(Tag, Event, #{}). diff --git a/src/eld_event.erl b/src/eld_event.erl index 65570b0..21f38a0 100644 --- a/src/eld_event.erl +++ b/src/eld_event.erl @@ -15,20 +15,55 @@ -export([new_identify/1]). -export([new_index/2]). -export([new_custom/3]). +-export([new_custom/4]). -export([strip_eval_reason/1]). %% Types --type event() :: #{ - type := event_type(), - timestamp := non_neg_integer(), - key => binary(), - user => eld_user:user(), - data => map() -}. +-type event() :: feature_request() | identify() | index() | custom(). -type event_type() :: identify | index | feature_request | custom. %% Event type +-type feature_request() :: #{ + type := feature_request, + timestamp := non_neg_integer(), + user := eld_user:user(), + data := #{ + key := eld_flag:key(), + variation := eld_flag:variation() | null, + value := eld_eval:result_value(), + default := eld_eval:result_value(), + version := eld_flag:version() | null, + prereq_of := eld_flag:key() | null, + track_events := boolean() | null, + debug_events_until_date := boolean() | null, + eval_reason := eld_eval:reason() | null, + include_reason := boolean(), + debug := boolean() + } +}. + +-type identify() :: #{ + type := identify, + timestamp := non_neg_integer(), + user := eld_user:user() +}. + +-type index() :: #{ + type := index, + timestamp := non_neg_integer(), + user := eld_user:user() +}. + +-type custom() :: #{ + type := custom, + timestamp := non_neg_integer(), + key := binary(), + user := eld_user:user(), + data => map(), + metric_value => number() +}. + -export_type([event/0]). %%=================================================================== @@ -69,7 +104,8 @@ new(feature_request, User, Timestamp, #{ prereq_of := PrereqOf, % null | eld_flag:key() track_events := TrackEvents, % null | boolean() debug_events_until_date := DebugEventsUntilDate, % null | boolean() - eval_reason := EvalReason % null | eld_eval:reason() + eval_reason := EvalReason, % null | eld_eval:reason() + include_reason := IncludeReason % boolean() }) -> #{ type => feature_request, @@ -85,6 +121,7 @@ new(feature_request, User, Timestamp, #{ track_events => TrackEvents, debug_events_until_date => DebugEventsUntilDate, eval_reason => EvalReason, + include_reason => IncludeReason, debug => false } }. @@ -122,6 +159,7 @@ new_for_unknown_flag(FlagKey, User, DefaultValue, Reason) -> track_events => null, debug_events_until_date => null, eval_reason => Reason, + include_reason => false, debug => false }, new(feature_request, User, erlang:system_time(milli_seconds), EventData). @@ -131,7 +169,7 @@ new_for_unknown_flag(FlagKey, User, DefaultValue, Reason) -> VariationValue :: eld_eval:result_value(), DefaultValue :: eld_eval:result_value(), User :: eld_user:user(), - Reason :: eld_eval:reason(), + Reason :: eld_eval:reason() | null, Flag :: eld_flag:flag() ) -> event(). new_flag_eval(VariationIndex, VariationValue, DefaultValue, User, Reason, #{ @@ -139,7 +177,8 @@ new_flag_eval(VariationIndex, VariationValue, DefaultValue, User, Reason, #{ version := Version, track_events := TrackEvents, debug_events_until_date := DebugEventsUntilDate -}) -> +} = Flag) -> + IsExperiment = is_experiment(Reason, Flag), EventData = #{ key => Key, variation => VariationIndex, @@ -147,9 +186,10 @@ new_flag_eval(VariationIndex, VariationValue, DefaultValue, User, Reason, #{ default => DefaultValue, version => Version, prereq_of => null, - track_events => TrackEvents, + track_events => TrackEvents or IsExperiment, debug_events_until_date => DebugEventsUntilDate, eval_reason => Reason, + include_reason => IsExperiment, debug => false }, new(feature_request, User, erlang:system_time(milli_seconds), EventData). @@ -159,7 +199,7 @@ new_flag_eval(VariationIndex, VariationValue, DefaultValue, User, Reason, #{ VariationValue :: eld_flag:variation_value(), PrerequisiteOf :: eld_flag:key(), User :: eld_user:user(), - Reason :: eld_eval:reason(), + Reason :: eld_eval:reason() | null, Flag :: eld_flag:flag() ) -> event(). new_prerequisite_eval(VariationIndex, VariationValue, PrerequisiteOf, User, Reason, #{ @@ -167,7 +207,8 @@ new_prerequisite_eval(VariationIndex, VariationValue, PrerequisiteOf, User, Reas version := Version, track_events := TrackEvents, debug_events_until_date := DebugEventsUntilDate -}) -> +} = Flag) -> + IsExperiment = is_experiment(Reason, Flag), EventData = #{ key => Key, variation => VariationIndex, @@ -175,9 +216,10 @@ new_prerequisite_eval(VariationIndex, VariationValue, PrerequisiteOf, User, Reas default => null, version => Version, prereq_of => PrerequisiteOf, - track_events => TrackEvents, + track_events => TrackEvents or IsExperiment, debug_events_until_date => DebugEventsUntilDate, eval_reason => Reason, + include_reason => IsExperiment, debug => false }, new(feature_request, User, erlang:system_time(milli_seconds), EventData). @@ -192,8 +234,36 @@ new_index(User, Timestamp) -> -spec new_custom(Key :: binary(), User :: eld_user:user(), Data :: map()) -> event(). new_custom(Key, User, Data) when is_binary(Key), is_map(Data) -> - new(custom, Key, User, erlang:system_time(milli_seconds), Data). + #{ + type => custom, + timestamp => erlang:system_time(milli_seconds), + key => Key, + user => User, + data => Data + }. + +-spec new_custom(Key :: binary(), User :: eld_user:user(), Data :: map(), MetricValue :: number()) -> event(). +new_custom(Key, User, Data, MetricValue) when is_binary(Key), is_map(Data), is_number(MetricValue) -> + #{ + type => custom, + timestamp => erlang:system_time(milli_seconds), + key => Key, + user => User, + data => Data, + metric_value => MetricValue + }. -spec strip_eval_reason(eld_event:event()) -> eld_event:event(). strip_eval_reason(#{type := feature_request, data := Data} = Event) -> Event#{data => maps:remove(eval_reason, Data)}. + +%%=================================================================== +%% Internal functions +%%=================================================================== + +-spec is_experiment(eld_eval:reason(), eld_flag:flag()) -> boolean(). +is_experiment(fallthrough, #{track_events_fallthrough := true}) -> true; +is_experiment({rule_match, RuleIndex, _}, #{rules := Rules}) when RuleIndex >=0, RuleIndex < length(Rules) -> + Rule = lists:nth(RuleIndex + 1, Rules), + maps:get(track_events, Rule, false); +is_experiment(_Reason, _Flag) -> false. diff --git a/src/eld_event_process_server.erl b/src/eld_event_process_server.erl index cb3d265..cdafa07 100644 --- a/src/eld_event_process_server.erl +++ b/src/eld_event_process_server.erl @@ -61,7 +61,7 @@ init([Tag]) -> Dispatcher = eld_settings:get_value(Tag, events_dispatcher), InlineUsers = eld_settings:get_value(Tag, inline_users_in_events), GlobalPrivateAttributes = eld_settings:get_value(Tag, private_attributes), - EventsUri = eld_settings:get_value(Tag, events_uri), + EventsUri = eld_settings:get_value(Tag, events_uri) ++ "/api/events/bulk", State = #{ sdk_key => SdkKey, dispatcher => Dispatcher, @@ -175,14 +175,14 @@ format_event(#{type := index, timestamp := Timestamp, user := User}, {FormattedE }, FormattedEvent = format_event_set_user(Kind, User, OutputEvent, InlineUsers, GlobalPrivateAttributes), {[FormattedEvent|FormattedEvents], InlineUsers, GlobalPrivateAttributes}; -format_event(#{type := custom, timestamp := Timestamp, key := Key, user := User, data := Data}, {FormattedEvents, InlineUsers, GlobalPrivateAttributes}) -> +format_event(#{type := custom, timestamp := Timestamp, key := Key, user := User, data := Data} = Event, {FormattedEvents, InlineUsers, GlobalPrivateAttributes}) -> Kind = <<"custom">>, - OutputEvent = #{ + OutputEvent = maybe_set_metric_value(Event, #{ <<"kind">> => Kind, <<"creationDate">> => Timestamp, <<"key">> => Key, <<"data">> => Data - }, + }), FormattedEvent = format_event_set_user(Kind, User, OutputEvent, InlineUsers, GlobalPrivateAttributes), {[FormattedEvent|FormattedEvents], InlineUsers, GlobalPrivateAttributes}. @@ -240,6 +240,12 @@ format_event_set_user(<<"custom">>, User, OutputEvent, true, GlobalPrivateAttrib format_event_set_user(<<"custom">>, #{key := UserKey}, OutputEvent, false, _) -> OutputEvent#{<<"userKey">> => UserKey}. +-spec maybe_set_metric_value(eld_event:event(), map()) -> map(). +maybe_set_metric_value(#{metric_value := MetricValue}, OutputEvent) -> + OutputEvent#{<<"metricValue">> => MetricValue}; +maybe_set_metric_value(_, OutputEvent) -> + OutputEvent. + -spec format_summary_event(eld_event_server:summary_event()) -> map(). format_summary_event(SummaryEvent) when map_size(SummaryEvent) == 0 -> #{}; format_summary_event(#{start_date := StartDate, end_date := EndDate, counters := Counters}) -> diff --git a/src/eld_event_server.erl b/src/eld_event_server.erl index c8274b1..3229494 100644 --- a/src/eld_event_server.erl +++ b/src/eld_event_server.erl @@ -101,6 +101,8 @@ init([Tag]) -> Capacity = eld_settings:get_value(Tag, events_capacity), InlineUsers = eld_settings:get_value(Tag, inline_users_in_events), TimerRef = erlang:send_after(FlushInterval, self(), {flush, Tag}), + % Need to trap exit so supervisor:terminate_child calls terminate callback + process_flag(trap_exit, true), State = #{ events => [], summary_event => #{}, @@ -140,6 +142,10 @@ handle_info(_Info, State) -> -spec terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: state()) -> term(). +terminate(Reason, #{timer_ref := TimerRef} = _State) -> + error_logger:info_msg("Terminating event service, reason: ~p", [Reason]), + _ = erlang:cancel_timer(TimerRef), + ok; terminate(_Reason, _State) -> ok. @@ -239,6 +245,8 @@ should_add_full_event(_) -> false. [eld_event:event()]. maybe_add_feature_request_full_fidelity(true, Event, #{include_reasons := true}, Events, Capacity) -> add_raw_event(Event, Events, Capacity); +maybe_add_feature_request_full_fidelity(true, #{data := #{include_reason := true}} = Event, _Options, Events, Capacity) -> + add_raw_event(Event, Events, Capacity); maybe_add_feature_request_full_fidelity(true, Event, _Options, Events, Capacity) -> add_raw_event(eld_event:strip_eval_reason(Event), Events, Capacity); maybe_add_feature_request_full_fidelity(false, _Event, _Options, Events, _Capacity) -> diff --git a/src/eld_flag.erl b/src/eld_flag.erl index 63dd955..e35ec72 100644 --- a/src/eld_flag.erl +++ b/src/eld_flag.erl @@ -12,20 +12,21 @@ %% Types -type flag() :: #{ - debug_events_until_date => pos_integer() | null, - deleted => boolean(), - fallthrough => variation_or_rollout(), - key => key(), - off_variation => variation(), - on => boolean(), - prerequisites => [prerequisite()], - rules => [eld_rule:rule()], - salt => binary(), - sel => binary(), - targets => [target()], - track_events => boolean(), - variations => [variation_value()], - version => version() + debug_events_until_date => pos_integer() | null, + deleted => boolean(), + fallthrough => variation_or_rollout(), + key => key(), + off_variation => variation(), + on => boolean(), + prerequisites => [prerequisite()], + rules => [eld_rule:rule()], + salt => binary(), + sel => binary(), + targets => [target()], + track_events => boolean(), + track_events_fallthrough => boolean(), + variations => [variation_value()], + version => version() }. -type key() :: binary(). @@ -74,36 +75,38 @@ -spec new(Key :: eld_flag:key(), Properties :: map()) -> flag(). new(Key, #{ - <<"debugEventsUntilDate">> := DebugEventsUntilDate, - <<"deleted">> := Deleted, - <<"fallthrough">> := Fallthrough, - <<"key">> := Key, - <<"offVariation">> := OffVariation, - <<"on">> := On, - <<"prerequisites">> := Prerequisites, - <<"rules">> := Rules, - <<"salt">> := Salt, - <<"sel">> := Sel, - <<"targets">> := Targets, - <<"trackEvents">> := TrackEvents, - <<"variations">> := Variations, - <<"version">> := Version + <<"debugEventsUntilDate">> := DebugEventsUntilDate, + <<"deleted">> := Deleted, + <<"fallthrough">> := Fallthrough, + <<"key">> := Key, + <<"offVariation">> := OffVariation, + <<"on">> := On, + <<"prerequisites">> := Prerequisites, + <<"rules">> := Rules, + <<"salt">> := Salt, + <<"sel">> := Sel, + <<"targets">> := Targets, + <<"trackEvents">> := TrackEvents, + <<"trackEventsFallthrough">> := TrackEventsFallthrough, + <<"variations">> := Variations, + <<"version">> := Version }) -> #{ - debug_events_until_date => DebugEventsUntilDate, - deleted => Deleted, - fallthrough => parse_variation_or_rollout(Fallthrough), - key => Key, - off_variation => OffVariation, - on => On, - prerequisites => parse_prerequisites(Prerequisites), - rules => parse_rules(Rules), - salt => Salt, - sel => Sel, - targets => parse_targets(Targets), - track_events => TrackEvents, - variations => Variations, - version => Version + debug_events_until_date => DebugEventsUntilDate, + deleted => Deleted, + fallthrough => parse_variation_or_rollout(Fallthrough), + key => Key, + off_variation => OffVariation, + on => On, + prerequisites => parse_prerequisites(Prerequisites), + rules => parse_rules(Rules), + salt => Salt, + sel => Sel, + targets => parse_targets(Targets), + track_events => TrackEvents, + track_events_fallthrough => TrackEventsFallthrough, + variations => Variations, + version => Version }. -spec get_variation(Flag :: flag(), VariationIndex :: non_neg_integer()|null) -> term(). diff --git a/src/eld_instance.erl b/src/eld_instance.erl index be42505..b2a2d25 100644 --- a/src/eld_instance.erl +++ b/src/eld_instance.erl @@ -15,9 +15,15 @@ base_uri => string(), stream_uri => string(), storage_backend => atom(), + events_capacity => pos_integer(), events_flush_interval => pos_integer(), + events_dispatcher => atom(), user_keys_capacity => pos_integer(), - start_stream => boolean() + inline_users_in_events => boolean(), + private_attributes => eld_settings:private_attributes(), + stream => boolean(), + polling_interval => pos_integer(), + polling_update_requestor => atom() }. %% Options for starting an SDK instance @@ -39,15 +45,16 @@ start(Tag, SdkKey, Options) -> ok = eld_settings:register(Tag, Settings), % Start instance supervisor SupName = get_ref_from_tag(instance, Tag), - StreamSupName = get_ref_from_tag(instance_stream, Tag), + StartStream = maps:get(stream, Settings), + UpdateSupName = get_ref_from_tag(instance_stream, Tag), + UpdateWorkerModule = get_update_processor(StartStream), EventsSupName = get_ref_from_tag(instance_events, Tag), - {ok, _} = supervisor:start_child(eld_sup, [SupName, StreamSupName, EventsSupName, Tag]), + {ok, _} = supervisor:start_child(eld_sup, [SupName, UpdateSupName, UpdateWorkerModule, EventsSupName, Tag]), % Start storage backend StorageBackend = maps:get(storage_backend, Settings), ok = StorageBackend:init(SupName, Tag, []), % Start stream client - StartStream = maps:get(start_stream, Options, true), - ok = start_stream(StartStream, Tag, SupName). + ok = start_updater(UpdateSupName, UpdateWorkerModule, Tag). %% @doc Stop a client instance %% @@ -57,7 +64,7 @@ stop(Tag) when is_atom(Tag) -> % TODO only stop stream instance if it's running % Terminate stream StreamSupName = get_ref_from_tag(instance_stream, Tag), - ok = eld_stream:stop(StreamSupName), + ok = eld_updater:stop(StreamSupName), % Terminate storage StorageBackend = eld_settings:get_value(Tag, storage_backend), ok = StorageBackend:terminate(Tag), @@ -91,14 +98,19 @@ get_ref_from_tag(instance_stream, Tag) when is_atom(Tag) -> get_ref_from_tag(instance_events, Tag) when is_atom(Tag) -> list_to_atom("eld_instance_events_" ++ atom_to_list(Tag)). -%% @doc Initialize streamer client and start listening +%% @doc Initialize update processor client and start listening %% @private %% %% @end --spec start_stream(StartStream :: boolean(), Tag :: atom(), SupName :: atom()) -> +-spec start_updater(atom(), atom(), atom()) -> ok. -start_stream(false, _, _) -> ok; -start_stream(true, Tag, _SupName) -> - StreamSupName = get_ref_from_tag(instance_stream, Tag), - %{ok, _Pid} = supervisor:start_child(eld_sup, [SupName, StreamSupName]), - ok = eld_stream:start(StreamSupName, Tag). +start_updater(UpdateSupName, UpdateWorkerModule, Tag) -> + ok = eld_updater:start(UpdateSupName, UpdateWorkerModule, Tag). + +%% @doc Get update processor module name depending on settings +%% @private +%% +%% @end +-spec get_update_processor(Stream :: boolean()) -> atom(). +get_update_processor(true) -> eld_update_stream_server; +get_update_processor(false) -> eld_update_poll_server. diff --git a/src/eld_instance_sup.erl b/src/eld_instance_sup.erl index 355f79c..395e7e9 100644 --- a/src/eld_instance_sup.erl +++ b/src/eld_instance_sup.erl @@ -9,7 +9,7 @@ -behaviour(supervisor). %% Supervision --export([start_link/4, init/1]). +-export([start_link/5, init/1]). %% Helper macro for declaring children of supervisor -define(CHILD(Id, Module, Args, Type), {Id, {Module, start_link, Args}, permanent, 5000, Type, [Module]}). @@ -20,30 +20,32 @@ -spec start_link( SupName :: atom(), - StreamSupName :: atom(), + UpdateSupName :: atom(), + UpdateWorkerModule :: atom(), EventSupName :: atom(), Tag :: atom() ) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}. -start_link(SupName, StreamSupName, EventSupName, Tag) -> +start_link(SupName, UpdateSupName, UpdateWorkerModule, EventSupName, Tag) -> error_logger:info_msg("Starting instance supervisor for ~p with name ~p", [Tag, SupName]), - supervisor:start_link({local, SupName}, ?MODULE, [StreamSupName, EventSupName, Tag]). + supervisor:start_link({local, SupName}, ?MODULE, [UpdateSupName, UpdateWorkerModule, EventSupName, Tag]). -spec init(Args :: term()) -> {ok, {{supervisor:strategy(), non_neg_integer(), pos_integer()}, [supervisor:child_spec()]}}. -init([StreamSupName, EventSupName, Tag]) -> - {ok, {{one_for_one, 1, 5}, children(StreamSupName, EventSupName, Tag)}}. +init([UpdateSupName, UpdateWorkerModule, EventSupName, Tag]) -> + {ok, {{one_for_one, 1, 5}, children(UpdateSupName, UpdateWorkerModule, EventSupName, Tag)}}. %%=================================================================== %% Internal functions %%=================================================================== -spec children( - StreamSupName :: atom(), + UpdateSupName :: atom(), + UpdateWorkerModule :: atom(), EventSupName :: atom(), Tag :: atom() ) -> [supervisor:child_spec()]. -children(StreamSupName, EventSupName, Tag) -> - StreamSup = ?CHILD(eld_stream_sup, eld_stream_sup, [StreamSupName], supervisor), +children(UpdateSupName, UpdateWorkerModule, EventSupName, Tag) -> + UpdateSup = ?CHILD(eld_update_sup, eld_update_sup, [UpdateSupName, UpdateWorkerModule], supervisor), EventSup = ?CHILD(eld_event_sup, eld_event_sup, [EventSupName, Tag], supervisor), - [StreamSup, EventSup]. + [UpdateSup, EventSup]. diff --git a/src/eld_rollout.erl b/src/eld_rollout.erl index 63955de..230b573 100644 --- a/src/eld_rollout.erl +++ b/src/eld_rollout.erl @@ -54,7 +54,7 @@ rollout_user(#{variations := WeightedVariations, bucket_by := BucketBy}, #{key : bucket_user(Key, Salt, User, BucketBy) -> UserValue = eld_user:get(BucketBy, User), UserSecondary = eld_user:get(secondary, User), - bucket_user_value(Key, Salt, UserValue, UserSecondary). + bucket_user_value(Key, Salt, bucketable_value(UserValue), UserSecondary). %%=================================================================== %% Internal functions @@ -90,3 +90,9 @@ bucket_hash(Hash) -> Sha1_15 = string:substr(Sha1Hex, 1, 15), Int = list_to_integer(Sha1_15, 16), Int / 1152921504606846975. + +-spec bucketable_value(any()) -> binary() | null. +bucketable_value(V) when is_binary(V) -> V; +bucketable_value(V) when is_integer(V) -> list_to_binary(integer_to_list(V)); +bucketable_value(V) when is_float(V) -> if V == trunc(V) -> list_to_binary(integer_to_list(trunc(V))); true -> null end; +bucketable_value(_) -> null. diff --git a/src/eld_rule.erl b/src/eld_rule.erl index 9737467..08b9d54 100644 --- a/src/eld_rule.erl +++ b/src/eld_rule.erl @@ -14,6 +14,7 @@ -type rule() :: #{ id => binary(), clauses => [eld_clause:clause()], + track_events => boolean(), variation_or_rollout => eld_flag:variation_or_rollout() }. %% Expresses a set of AND-ed matching conditions for a user, along with either @@ -26,10 +27,15 @@ %%=================================================================== -spec new(map()) -> rule(). -new(#{<<"id">> := Id, <<"clauses">> := Clauses, <<"variation">> := Variation}) -> - #{id => Id, clauses => parse_clauses(Clauses), variation_or_rollout => Variation}; -new(#{<<"id">> := Id, <<"clauses">> := Clauses, <<"rollout">> := Rollout}) -> - #{id => Id, clauses => parse_clauses(Clauses), variation_or_rollout => parse_rollout(Rollout)}. +new(#{<<"id">> := Id, <<"clauses">> := Clauses, <<"trackEvents">> := TrackEvents, <<"variation">> := Variation}) -> + #{id => Id, clauses => parse_clauses(Clauses), track_events => TrackEvents, variation_or_rollout => Variation}; +new(#{<<"id">> := Id, <<"clauses">> := Clauses, <<"trackEvents">> := TrackEvents, <<"rollout">> := Rollout}) -> + #{ + id => Id, + clauses => parse_clauses(Clauses), + track_events => TrackEvents, + variation_or_rollout => parse_rollout(Rollout) + }. %% @doc Match all clauses to user, includes segment_match %% diff --git a/src/eld_settings.erl b/src/eld_settings.erl index 3d92237..197fda5 100644 --- a/src/eld_settings.erl +++ b/src/eld_settings.erl @@ -29,7 +29,10 @@ events_dispatcher => atom(), user_keys_capacity => pos_integer(), inline_users_in_events => boolean(), - private_attributes => private_attributes() + private_attributes => private_attributes(), + stream => boolean(), + polling_interval => pos_integer(), + polling_update_requestor => atom() }. % Settings stored for each running SDK instance @@ -37,11 +40,10 @@ -export_type([private_attributes/0]). - %% Constants -define(DEFAULT_BASE_URI, "https://app.launchdarkly.com"). --define(DEFAULT_EVENTS_URI, "https://events.launchdarkly.com/api/events/bulk"). --define(DEFAULT_STREAM_URI, "https://stream.launchdarkly.com/all"). +-define(DEFAULT_EVENTS_URI, "https://events.launchdarkly.com"). +-define(DEFAULT_STREAM_URI, "https://stream.launchdarkly.com"). -define(DEFAULT_STORAGE_BACKEND, eld_storage_ets). -define(DEFAULT_EVENTS_CAPACITY, 10000). -define(DEFAULT_EVENTS_FLUSH_INTERVAL, 30000). @@ -49,8 +51,11 @@ -define(DEFAULT_USER_KEYS_CAPACITY, 1000). -define(DEFAULT_INLINE_USERS_IN_EVENTS, false). -define(DEFAULT_PRIVATE_ATTRIBUTES, []). +-define(DEFAULT_STREAM, true). +-define(DEFAULT_POLLING_UPDATE_REQUESTOR, eld_update_requestor_httpc). +-define(MINIMUM_POLLING_INTERVAL, 30). -define(USER_AGENT, "ErlangClient"). --define(VERSION, "1.0.0-alpha3"). +-define(VERSION, "1.0.0-alpha4"). -define(EVENT_SCHEMA, "3"). %%=================================================================== @@ -81,6 +86,12 @@ parse_options(SdkKey, Options) when is_list(SdkKey), is_map(Options) -> UserKeysCapacity = maps:get(user_keys_capacity, Options, ?DEFAULT_USER_KEYS_CAPACITY), InlineUsersInEvents = maps:get(inline_users_in_events, Options, ?DEFAULT_INLINE_USERS_IN_EVENTS), PrivateAttributes = maps:get(private_attributes, Options, ?DEFAULT_PRIVATE_ATTRIBUTES), + Stream = maps:get(stream, Options, ?DEFAULT_STREAM), + PollingUpdateRequestor = maps:get(polling_update_requestor, Options, ?DEFAULT_POLLING_UPDATE_REQUESTOR), + PollingInterval = lists:max([ + ?MINIMUM_POLLING_INTERVAL, + maps:get(polling_interval, Options, ?MINIMUM_POLLING_INTERVAL) + ]), #{ sdk_key => SdkKey, base_uri => BaseUri, @@ -92,7 +103,10 @@ parse_options(SdkKey, Options) when is_list(SdkKey), is_map(Options) -> events_dispatcher => EventsDispatcher, user_keys_capacity => UserKeysCapacity, inline_users_in_events => InlineUsersInEvents, - private_attributes => PrivateAttributes + private_attributes => PrivateAttributes, + stream => Stream, + polling_update_requestor => PollingUpdateRequestor, + polling_interval => PollingInterval }. %% @doc Get all registered tags diff --git a/src/eld_storage_engine.erl b/src/eld_storage_engine.erl index c84c604..89f999f 100644 --- a/src/eld_storage_engine.erl +++ b/src/eld_storage_engine.erl @@ -29,6 +29,7 @@ | {error, already_exists, string()}. %% `empty' must delete all records from the specified bucket. +%% If the bucket doesn't exist, it must return `bucket_not_found' error. -callback empty(Tag :: atom(), Bucket :: atom()) -> ok | {error, bucket_not_found, string()}. @@ -52,6 +53,11 @@ ok | {error, bucket_not_found, string()}. +%% `put_clean' must perform `empty' and `put' atomically on a given bucket. +-callback put_clean(Tag :: atom(), Bucket :: atom(), Item :: #{Key :: binary() => Value :: any()}) -> + ok + | {error, bucket_not_found, string()}. + %% `terminate' is the opposite of `init'. It is expected to clean up any %% resources and fully shut down the storage backend. -callback terminate(Tag :: atom()) -> ok. diff --git a/src/eld_storage_ets.erl b/src/eld_storage_ets.erl index 59b3c3c..5de49bd 100644 --- a/src/eld_storage_ets.erl +++ b/src/eld_storage_ets.erl @@ -19,6 +19,7 @@ -export([get/3]). -export([list/2]). -export([put/3]). +-export([put_clean/3]). -export([terminate/1]). %%=================================================================== @@ -71,6 +72,13 @@ put(Tag, Bucket, Items) -> ServerRef = get_local_reg_name(worker, Tag), eld_storage_ets_server:put(ServerRef, Bucket, Items). +-spec put_clean(Tag :: atom(), Bucket :: atom(), Items :: #{Key :: binary() => Value :: any()}) -> + ok | + {error, bucket_not_found, string()}. +put_clean(Tag, Bucket, Items) -> + ServerRef = get_local_reg_name(worker, Tag), + eld_storage_ets_server:put_clean(ServerRef, Bucket, Items). + -spec terminate(Tag :: atom()) -> ok. terminate(_Tag) -> ok. diff --git a/src/eld_storage_ets_server.erl b/src/eld_storage_ets_server.erl index 979fc25..6daea4e 100644 --- a/src/eld_storage_ets_server.erl +++ b/src/eld_storage_ets_server.erl @@ -21,6 +21,7 @@ -export([get/3]). -export([list/2]). -export([put/3]). +-export([put_clean/3]). %% Types -type state() :: #{ @@ -78,7 +79,7 @@ get(ServerRef, Bucket, Key) when is_atom(Bucket), is_binary(Key) -> list(ServerRef, Bucket) when is_atom(Bucket) -> gen_server:call(ServerRef, {list, Bucket}). -%% @doc Put an item key value pair in an existing bucket +%% @doc Put item key value pairs in an existing bucket %% %% @end -spec put(ServerRef :: atom(), Bucket :: atom(), Items :: #{Key :: binary() => Value :: any()}) -> @@ -87,6 +88,15 @@ list(ServerRef, Bucket) when is_atom(Bucket) -> put(ServerRef, Bucket, Items) when is_atom(Bucket), is_map(Items) -> ok = gen_server:call(ServerRef, {put, Bucket, Items}). +%% @doc Perform an atomic empty and put +%% +%% @end +-spec put_clean(ServerRef :: atom(), Bucket :: atom(), Items :: #{Key :: binary() => Value :: any()}) -> + ok | + {error, bucket_not_found, string()}. +put_clean(ServerRef, Bucket, Items) when is_atom(Bucket), is_map(Items) -> + ok = gen_server:call(ServerRef, {put_clean, Bucket, Items}). + %%=================================================================== %% Behavior callbacks %%=================================================================== @@ -104,7 +114,9 @@ handle_call({get, Bucket, Key}, _From, #{tids := Tids} = State) -> handle_call({list, Bucket}, _From, #{tids := Tids} = State) -> {reply, list_items(bucket_exists(Bucket, Tids), Bucket, Tids), State}; handle_call({put, Bucket, Item}, _From, #{tids := Tids} = State) -> - {reply, put_items(bucket_exists(Bucket, Tids), Item, Bucket, Tids), State}. + {reply, put_items(bucket_exists(Bucket, Tids), Item, Bucket, Tids), State}; +handle_call({put_clean, Bucket, Item}, _From, #{tids := Tids} = State) -> + {reply, put_clean_items(bucket_exists(Bucket, Tids), Item, Bucket, Tids), State}. handle_cast(_Msg, State) -> {noreply, State}. @@ -182,7 +194,7 @@ lookup_key(true, Key, Bucket, Tids) -> Tid = maps:get(Bucket, Tids), ets:lookup(Tid, Key). -%% @doc Put a key value pair in bucket +%% @doc Put key value pairs in bucket %% @private %% %% @end @@ -195,3 +207,18 @@ put_items(true, Items, Bucket, Tids) -> Tid = maps:get(Bucket, Tids), true = ets:insert(Tid, maps:to_list(Items)), ok. + +%% @doc Empty bucket and put key value pairs +%% @private +%% +%% @end +-spec put_clean_items(BucketExists :: boolean(), Items :: #{Key :: binary() => Value :: any()}, Bucket :: atom(), Tids :: map()) -> + ok | + {error, bucket_not_found, string()}. +put_clean_items(false, _Items, Bucket, _Tids) -> + {error, bucket_not_found, "ETS table " ++ atom_to_list(Bucket) ++ " does not exist."}; +put_clean_items(true, Items, Bucket, Tids) -> + Tid = maps:get(Bucket, Tids), + true = ets:delete_all_objects(Tid), + true = ets:insert(Tid, maps:to_list(Items)), + ok. diff --git a/src/eld_storage_map.erl b/src/eld_storage_map.erl index 693fea2..22c692b 100644 --- a/src/eld_storage_map.erl +++ b/src/eld_storage_map.erl @@ -19,6 +19,7 @@ -export([get/3]). -export([list/2]). -export([put/3]). +-export([put_clean/3]). -export([terminate/1]). %%=================================================================== @@ -71,6 +72,13 @@ put(Tag, Bucket, Items) -> ServerRef = get_local_reg_name(worker, Tag), eld_storage_map_server:put(ServerRef, Bucket, Items). +-spec put_clean(Tag :: atom(), Bucket :: atom(), Items :: #{Key :: binary() => Value :: any()}) -> + ok | + {error, bucket_not_found, string()}. +put_clean(Tag, Bucket, Items) -> + ServerRef = get_local_reg_name(worker, Tag), + eld_storage_map_server:put_clean(ServerRef, Bucket, Items). + -spec terminate(Tag :: atom()) -> ok. terminate(_Tag) -> ok. diff --git a/src/eld_storage_map_server.erl b/src/eld_storage_map_server.erl index f8da782..ffaba0b 100644 --- a/src/eld_storage_map_server.erl +++ b/src/eld_storage_map_server.erl @@ -22,6 +22,7 @@ -export([get/3]). -export([list/2]). -export([put/3]). +-export([put_clean/3]). %% Types -type state() :: #{data => map()}. @@ -77,7 +78,7 @@ get(ServerRef, Bucket, Key) when is_atom(Bucket), is_binary(Key) -> list(ServerRef, Bucket) when is_atom(Bucket) -> gen_server:call(ServerRef, {list, Bucket}). -%% @doc Put an item key value pair in an existing bucket +%% @doc Put item key value pairs in an existing bucket %% %% @end -spec put(ServerRef :: atom(), Bucket :: atom(), Items :: #{Key :: binary() => Value :: any()}) -> @@ -86,6 +87,15 @@ list(ServerRef, Bucket) when is_atom(Bucket) -> put(ServerRef, Bucket, Items) when is_atom(Bucket), is_map(Items) -> ok = gen_server:call(ServerRef, {put, Bucket, Items}). +%% @doc Perform an atomic empty and put +%% +%% @end +-spec put_clean(ServerRef :: atom(), Bucket :: atom(), Items :: #{Key :: binary() => Value :: any()}) -> + ok | + {error, bucket_not_found, string()}. +put_clean(ServerRef, Bucket, Items) when is_atom(Bucket), is_map(Items) -> + ok = gen_server:call(ServerRef, {put_clean, Bucket, Items}). + %%=================================================================== %% Behavior callbacks %%=================================================================== @@ -116,8 +126,16 @@ handle_call({put, Bucket, Items}, _From, #{data := Data} = State) -> {reply, {error, bucket_not_found, Error}, State}; {ok, NewData} -> {reply, ok, maps:update(data, NewData, State)} + end; +handle_call({put_clean, Bucket, Items}, _From, #{data := Data} = State) -> + case put_clean_items(Items, Bucket, Data) of + {error, bucket_not_found, Error} -> + {reply, {error, bucket_not_found, Error}, State}; + {ok, NewData} -> + {reply, ok, maps:update(data, NewData, State)} end. + handle_cast(_Msg, State) -> {noreply, State}. handle_info(_Msg, State) -> {noreply, State}. @@ -207,7 +225,7 @@ lookup_key(Key, Bucket, Data) when is_atom(Bucket), is_binary(Key) -> end end. -%% @doc Put a key value pair in bucket +%% @doc Put key value pairs in bucket %% @private %% %% @end @@ -223,3 +241,18 @@ put_items(Items, Bucket, Data) when is_map(Items), is_atom(Bucket) -> NewBucketData = maps:merge(BucketData, Items), {ok, maps:update(Bucket, NewBucketData, Data)} end. + +%% @doc Empty bucket and put key value pairs +%% @private +%% +%% @end +-spec put_clean_items(Items :: #{Key :: binary() => Value :: any()}, Bucket :: atom(), Data :: map()) -> + {ok, NewData :: map()} | + {error, bucket_not_found, string()}. +put_clean_items(Items, Bucket, Data) when is_map(Items), is_atom(Bucket) -> + case bucket_exists(Bucket, Data) of + false -> + {error, bucket_not_found, "Map " ++ atom_to_list(Bucket) ++ " does not exist."}; + _ -> + {ok, maps:update(Bucket, Items, Data)} + end. diff --git a/src/eld_update_poll_server.erl b/src/eld_update_poll_server.erl new file mode 100644 index 0000000..3198e8e --- /dev/null +++ b/src/eld_update_poll_server.erl @@ -0,0 +1,177 @@ +%%------------------------------------------------------------------- +%% @doc Polling server +%% +%% @end +%%------------------------------------------------------------------- + +-module(eld_update_poll_server). + +-behaviour(gen_server). +-behaviour(eld_update_server). + +%% Supervision +-export([start_link/1, init/1]). + +%% Behavior callbacks +-export([code_change/3, handle_call/3, handle_cast/2, handle_info/2, terminate/2]). + +%% API +-export([listen/1]). + +-type state() :: #{ + sdk_key := string(), + storage_backend := atom(), + storage_tag := atom(), + requestor := atom(), + poll_uri := string(), + poll_interval := pos_integer(), + last_response_hash := binary(), + timer_ref => reference() +}. + +%% Constants +-define(LATEST_ALL_PATH, "/sdk/latest-all"). + +-ifdef(TEST). +-compile(export_all). +-endif. + +%%=================================================================== +%% API +%%=================================================================== + +%% @doc Start listening to streaming events +%% +%% @end +-spec listen(Pid :: pid()) -> + ok. +listen(Pid) -> + gen_server:call(Pid, {listen}). + +%%=================================================================== +%% Supervision +%%=================================================================== + +%% @doc Starts the server +%% +%% @end +-spec start_link(Tag :: atom()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}. +start_link(Tag) -> + error_logger:info_msg("Starting polling update server for ~p", [Tag]), + gen_server:start_link(?MODULE, [Tag], []). + +-spec init(Args :: term()) -> + {ok, State :: state()} | {ok, State :: state(), timeout() | hibernate} | + {stop, Reason :: term()} | ignore. +init([Tag]) -> + SdkKey = eld_settings:get_value(Tag, sdk_key), + StorageBackend = eld_settings:get_value(Tag, storage_backend), + Requestor = eld_settings:get_value(Tag, polling_update_requestor), + PollUri = eld_settings:get_value(Tag, base_uri) ++ ?LATEST_ALL_PATH, + PollInterval = eld_settings:get_value(Tag, polling_interval) * 1000, + % Need to trap exit so supervisor:terminate_child calls terminate callback + process_flag(trap_exit, true), + State = #{ + sdk_key => SdkKey, + storage_backend => StorageBackend, + storage_tag => Tag, + requestor => Requestor, + poll_uri => PollUri, + poll_interval => PollInterval, + last_response_hash => <<>> + }, + {ok, State}. + +%%=================================================================== +%% Behavior callbacks +%%=================================================================== + +-type from() :: {pid(), term()}. +-spec handle_call(Request :: term(), From :: from(), State :: state()) -> + {reply, Reply :: term(), NewState :: state()} | + {stop, normal, {error, atom(), term()}, state()}. +handle_call({listen}, _From, + #{ + sdk_key := SdkKey, + storage_backend := StorageBackend, + storage_tag := Tag, + requestor := Requestor, + poll_uri := Uri, + poll_interval := PollInterval, + last_response_hash := LastHash + } = State) -> + {ok, NewHash} = poll(SdkKey, StorageBackend, Requestor, Uri, LastHash, Tag), + TimerRef = erlang:send_after(PollInterval, self(), {poll}), + NewState = State#{timer_ref => TimerRef, last_response_hash := NewHash}, + {reply, ok, NewState}. + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info({poll}, + #{ + sdk_key := SdkKey, + storage_backend := StorageBackend, + storage_tag := Tag, + requestor := Requestor, + poll_uri := Uri, + poll_interval := PollInterval, + last_response_hash := LastHash + } = State) -> + {ok, NewHash} = poll(SdkKey, StorageBackend, Requestor, Uri, LastHash, Tag), + TimerRef = erlang:send_after(PollInterval, self(), {poll}), + {noreply, State#{timer_ref := TimerRef, last_response_hash := NewHash}}; +handle_info(_Info, State) -> + {noreply, State}. + +-spec terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: state()) -> term(). +terminate(Reason, #{timer_ref := TimerRef} = _State) -> + error_logger:info_msg("Terminating polling, reason: ~p", [Reason]), + _ = erlang:cancel_timer(TimerRef), + ok; +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%=================================================================== +%% Internal functions +%%=================================================================== + +-spec poll(string(), atom(), atom(), string(), binary(), atom()) -> {ok, binary()}. +poll(SdkKey, StorageBackend, Requestor, Uri, LastHash, Tag) -> + process_response(Requestor:all(Uri, SdkKey), StorageBackend, LastHash, Tag, Uri). + +-spec process_response(eld_update_requestor:response(), atom(), binary(), atom(), string()) -> {ok, binary()}. +process_response({error, 401, _Reason}, _, LastHash, _, Uri) -> + error_logger:warning_msg("Invalid SDK key when when polling for updates at URL ~p. Verify that your SDK key is correct.", [Uri]), + {ok, LastHash}; +process_response({error, 404, _Reason}, _, LastHash, _, Uri) -> + error_logger:warning_msg("Resource not found when polling for updates at URL ~p.", [Uri]), + {ok, LastHash}; +process_response({error, StatusCode, Reason}, _, LastHash, _, Uri) when StatusCode >= 300 -> + error_logger:warning_msg("Unexpected response code: ~p when polling for updates at URL ~p: ~p.", [StatusCode, Uri, Reason]), + {ok, LastHash}; +process_response({ok, ResponseBody}, StorageBackend, LastHash, Tag, _) -> + NewHash = crypto:hash(sha, ResponseBody), + process_response_body_last_hash(ResponseBody, StorageBackend, LastHash, NewHash, Tag). + +-spec process_response_body_last_hash(binary(), atom(), binary(), binary(), atom()) -> {ok, binary()}. +process_response_body_last_hash(_ResponseBody, _StorageBackend, Hash, Hash, _Tag) -> {ok, Hash}; +process_response_body_last_hash(ResponseBody, StorageBackend, _, NewHash, Tag) -> + process_response_body(ResponseBody, StorageBackend, NewHash, Tag). + +-spec process_response_body(binary(), atom(), binary(), atom()) -> {ok, binary()}. +process_response_body(ResponseBody, StorageBackend, NewHash, Tag) -> + Data = jsx:decode(ResponseBody, [return_maps]), + [Flags, Segments] = get_put_items(Data), + ok = StorageBackend:put(Tag, flags, Flags), + ok = StorageBackend:put(Tag, segments, Segments), + {ok, NewHash}. + +-spec get_put_items(Data :: map()) -> [map()]. +get_put_items(#{<<"flags">> := Flags, <<"segments">> := Segments}) -> + [Flags, Segments]. diff --git a/src/eld_update_requestor.erl b/src/eld_update_requestor.erl new file mode 100644 index 0000000..8288dac --- /dev/null +++ b/src/eld_update_requestor.erl @@ -0,0 +1,17 @@ +%%------------------------------------------------------------------- +%% @doc `eld_update_requestor' module +%% +%% This is a behavior that event dispatchers must implement. It is used to send +%% event batches to LaunchDarkly. +%% @end +%%------------------------------------------------------------------- + +-module(eld_update_requestor). + +-type response() :: {ok, binary()} | {error, httpc:status_code(), string()}. + +-export_type([response/0]). + +%% `all' must request and return all flags and segments. It takes the +%% destination URI and SDK key. +-callback all(Uri :: string(), SdkKey :: string()) -> response(). diff --git a/src/eld_update_requestor_httpc.erl b/src/eld_update_requestor_httpc.erl new file mode 100644 index 0000000..2728b3d --- /dev/null +++ b/src/eld_update_requestor_httpc.erl @@ -0,0 +1,43 @@ +%%------------------------------------------------------------------- +%% @doc Polling update requestor +%% +%% @end +%%------------------------------------------------------------------- + +-module(eld_update_requestor_httpc). + +-behaviour(eld_update_requestor). + +%% Behavior callbacks +-export([all/2]). + +%%=================================================================== +%% Behavior callbacks +%%=================================================================== + +%% @doc Send events to LaunchDarkly event server +%% +%% @end +-spec all(Uri :: string(), SdkKey :: string()) -> eld_update_requestor:response(). +all(Uri, SdkKey) -> + Headers = [ + {"Authorization", SdkKey}, + {"X-LaunchDarkly-Event-Schema", eld_settings:get_event_schema()}, + {"User-Agent", eld_settings:get_user_agent()} + ], + {ok, {{Version, StatusCode, ReasonPhrase}, _Headers, Body}} = + httpc:request(get, {Uri, Headers}, [], [{body_format, binary}]), + Result = if + StatusCode < 400 -> {ok, Body}; + true -> {error, StatusCode, format_response(Version, StatusCode, ReasonPhrase)} + end, + Result. + +%%=================================================================== +%% Internal functions +%%=================================================================== + +-spec format_response(Version :: string(), StatusCode :: integer(), ReasonPhrase :: string()) -> + string(). +format_response(Version, StatusCode, ReasonPhrase) -> + io_lib:format("~s ~b ~s", [Version, StatusCode, ReasonPhrase]). diff --git a/src/eld_update_server.erl b/src/eld_update_server.erl new file mode 100644 index 0000000..a95562e --- /dev/null +++ b/src/eld_update_server.erl @@ -0,0 +1,11 @@ +%%------------------------------------------------------------------- +%% @doc `eld_update_server' module +%% +%% This is a behavior that update processors must implement. +%% @end +%%------------------------------------------------------------------- + +-module(eld_update_server). + +%% `listen' must make the worker start listening for flag and segment updates. +-callback listen(Pid :: pid()) -> ok. diff --git a/src/eld_stream_server.erl b/src/eld_update_stream_server.erl similarity index 94% rename from src/eld_stream_server.erl rename to src/eld_update_stream_server.erl index 3619077..8fc5ab1 100644 --- a/src/eld_stream_server.erl +++ b/src/eld_update_stream_server.erl @@ -4,9 +4,10 @@ %% @end %%------------------------------------------------------------------- --module(eld_stream_server). +-module(eld_update_stream_server). -behaviour(gen_server). +-behaviour(eld_update_server). %% Supervision -export([start_link/1, init/1]). @@ -18,14 +19,13 @@ -export([listen/1]). -type state() :: #{ - key => atom(), - conn => pid() | undefined, - backoff => backoff:backoff(), - last_attempted => non_neg_integer(), - sdk_key => string(), - storage_backend => atom(), - storage_tag => atom(), - stream_uri => string() + conn := pid() | undefined, + backoff := backoff:backoff(), + last_attempted := non_neg_integer(), + sdk_key := string(), + storage_backend := atom(), + storage_tag := atom(), + stream_uri := string() }. -ifdef(TEST). @@ -54,6 +54,7 @@ listen(Pid) -> -spec start_link(Tag :: atom()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}. start_link(Tag) -> + error_logger:info_msg("Starting streaming update server for ~p", [Tag]), gen_server:start_link(?MODULE, [Tag], []). -spec init(Args :: term()) -> @@ -61,7 +62,7 @@ start_link(Tag) -> {stop, Reason :: term()} | ignore. init([Tag]) -> SdkKey = eld_settings:get_value(Tag, sdk_key), - StreamUri = eld_settings:get_value(Tag, stream_uri), + StreamUri = eld_settings:get_value(Tag, stream_uri) ++ "/all", StorageBackend = eld_settings:get_value(Tag, storage_backend), Backoff = backoff:type(backoff:init(100, 30000, self(), listen), jitter), % Need to trap exit so supervisor:terminate_child calls terminate callback @@ -111,7 +112,7 @@ terminate(Reason, #{conn := undefined} = _State) -> error_logger:info_msg("Terminating, reason: ~p; Pid none~n", [Reason]), ok; terminate(Reason, #{conn := ShotgunPid} = _State) -> - error_logger:info_msg("Terminating, reason: ~p; Pid ~p~n", [Reason, ShotgunPid]), + error_logger:info_msg("Terminating streaming connection, reason: ~p; Pid ~p~n", [Reason, ShotgunPid]), ok = shotgun:close(ShotgunPid). code_change(_OldVsn, State, _Extra) -> @@ -211,8 +212,8 @@ get_event_operation(<<"patch">>) -> patch. process_items(put, Data, StorageBackend, Tag) -> [Flags, Segments] = get_put_items(Data), error_logger:info_msg("Received event with ~p flags and ~p segments", [maps:size(Flags), maps:size(Segments)]), - ok = StorageBackend:put(Tag, flags, Flags), - ok = StorageBackend:put(Tag, segments, Segments); + ok = StorageBackend:put_clean(Tag, flags, Flags), + ok = StorageBackend:put_clean(Tag, segments, Segments); process_items(patch, Data, StorageBackend, Tag) -> {Bucket, Key, Item} = get_patch_item(Data), ok = maybe_patch_item(StorageBackend, Tag, Bucket, Key, Item); diff --git a/src/eld_stream_sup.erl b/src/eld_update_sup.erl similarity index 63% rename from src/eld_stream_sup.erl rename to src/eld_update_sup.erl index 7fa2508..5570438 100644 --- a/src/eld_stream_sup.erl +++ b/src/eld_update_sup.erl @@ -1,36 +1,36 @@ %%------------------------------------------------------------------- -%% @doc Stream supervisor +%% @doc Update processor supervisor %% %% @end %%------------------------------------------------------------------- --module(eld_stream_sup). +-module(eld_update_sup). -behaviour(supervisor). %% Supervision --export([start_link/1, init/1]). +-export([start_link/2, init/1]). %%=================================================================== %% Supervision %%=================================================================== --spec start_link(StreamSupName :: atom()) -> +-spec start_link(UpdateSupName :: atom(), UpdateWorkerModule :: atom()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}. -start_link(StreamSupName) -> - supervisor:start_link({local, StreamSupName}, ?MODULE, []). +start_link(UpdateSupName, UpdateWorkerModule) -> + supervisor:start_link({local, UpdateSupName}, ?MODULE, [UpdateWorkerModule]). -spec init(Args :: term()) -> {ok, {{supervisor:strategy(), non_neg_integer(), pos_integer()}, [supervisor:child_spec()]}}. -init([]) -> +init([UpdateWorkerModule]) -> MaxRestart = 10, MaxTime = 3600, ChildSpec = { - eld_stream_server, - {eld_stream_server, start_link, []}, + UpdateWorkerModule, + {UpdateWorkerModule, start_link, []}, permanent, 5000, % shutdown time worker, - [eld_stream_server] + [UpdateWorkerModule] }, {ok, {{simple_one_for_one, MaxRestart, MaxTime}, [ChildSpec]}}. diff --git a/src/eld_stream.erl b/src/eld_updater.erl similarity index 81% rename from src/eld_stream.erl rename to src/eld_updater.erl index d5aeccd..01c8276 100644 --- a/src/eld_stream.erl +++ b/src/eld_updater.erl @@ -1,14 +1,14 @@ %%------------------------------------------------------------------- -%% @doc `eld_stream' module +%% @doc `eld_updater' module %% %% Used to start and stop client stream listener. %% @end %%------------------------------------------------------------------- --module(eld_stream). +-module(eld_updater). %% API --export([start/2]). +-export([start/3]). -export([stop/1]). %% Constants @@ -21,14 +21,14 @@ %% @doc Start client stream listener %% %% @end --spec start(StreamSupName :: atom(), Tag :: atom()) -> +-spec start(UpdateSupName :: atom(), UpdateWorkerModule :: atom(), Tag :: atom()) -> ok | {error, gun_open_failed, term()} | {error, gun_open_timeout, term()} | {error, get_request_failed, term()}. -start(StreamSupName, Tag) when is_atom(StreamSupName) -> - {ok, Pid} = supervisor:start_child(StreamSupName, [Tag]), - eld_stream_server:listen(Pid). +start(UpdateSupName, UpdateWorkerModule, Tag) when is_atom(UpdateSupName), is_atom(UpdateWorkerModule) -> + {ok, Pid} = supervisor:start_child(UpdateSupName, [Tag]), + UpdateWorkerModule:listen(Pid). %% @doc Stop client stream listener %% diff --git a/test/eld_eval_SUITE.erl b/test/eld_eval_SUITE.erl index f907061..d433ab4 100644 --- a/test/eld_eval_SUITE.erl +++ b/test/eld_eval_SUITE.erl @@ -49,6 +49,9 @@ rule_nomatch_in_negated_null_attribute/1, fallthrough_rollout/1, fallthrough_rollout_custom/1, + fallthrough_rollout_custom_integer/1, + fallthrough_rollout_custom_float/1, + fallthrough_rollout_custom_float_invalid/1, variation_out_of_range/1, extra_fields/1 ]). @@ -97,14 +100,21 @@ all() -> rule_nomatch_in_negated_null_attribute, fallthrough_rollout, fallthrough_rollout_custom, + fallthrough_rollout_custom_integer, + fallthrough_rollout_custom_float, + fallthrough_rollout_custom_float_invalid, variation_out_of_range, extra_fields ]. init_per_suite(Config) -> {ok, _} = application:ensure_all_started(eld), - eld:start_instance("", #{start_stream => false}), - eld:start_instance("", another1, #{start_stream => false}), + Options = #{ + stream => false, + polling_update_requestor => eld_update_requestor_test + }, + eld:start_instance("", Options), + eld:start_instance("", another1, Options), ok = create_flags(), Config. @@ -126,8 +136,8 @@ create_flags() -> DataFilename2 = code:priv_dir(eld) ++ "/flags-segments-put-data-another1.json", {ok, PutData} = file:read_file(DataFilename), {ok, PutData2} = file:read_file(DataFilename2), - ok = eld_stream_server:process_event(#{event => <<"put">>, data => PutData}, eld_storage_ets, default), - ok = eld_stream_server:process_event(#{event => <<"put">>, data => PutData2}, eld_storage_ets, another1). + ok = eld_update_stream_server:process_event(#{event => <<"put">>, data => PutData}, eld_storage_ets, default), + ok = eld_update_stream_server:process_event(#{event => <<"put">>, data => PutData2}, eld_storage_ets, another1). extract_events(Events) -> [ @@ -584,6 +594,54 @@ fallthrough_rollout_custom(_) -> ActualEvents = lists:sort(extract_events(Events)), ExpectedEvents = ActualEvents. +fallthrough_rollout_custom_integer(_) -> + ExpectedReason = fallthrough, + User = #{ + key => <<"user-foo">>, + custom => #{ + <<"customProp">> => 514343 + } + }, + {{4, <<"e">>, ExpectedReason}, Events} = + eld_eval:flag_key_for_user(default, <<"roll-me-custom">>, User, "foo"), + ExpectedEvents = lists:sort([ + {<<"roll-me-custom">>, feature_request, 4, <<"e">>, "foo", ExpectedReason, null} + ]), + ActualEvents = lists:sort(extract_events(Events)), + ExpectedEvents = ActualEvents. + +fallthrough_rollout_custom_float(_) -> + ExpectedReason = fallthrough, + User = #{ + key => <<"user-foo">>, + custom => #{ + <<"customProp">> => 514343.0 + } + }, + {{4, <<"e">>, ExpectedReason}, Events} = + eld_eval:flag_key_for_user(default, <<"roll-me-custom">>, User, "foo"), + ExpectedEvents = lists:sort([ + {<<"roll-me-custom">>, feature_request, 4, <<"e">>, "foo", ExpectedReason, null} + ]), + ActualEvents = lists:sort(extract_events(Events)), + ExpectedEvents = ActualEvents. + +fallthrough_rollout_custom_float_invalid(_) -> + ExpectedReason = fallthrough, + User = #{ + key => <<"user-foo">>, + custom => #{ + <<"customProp">> => 514343.05 + } + }, + {{0, <<"a">>, ExpectedReason}, Events} = + eld_eval:flag_key_for_user(default, <<"roll-me-custom">>, User, "foo"), + ExpectedEvents = lists:sort([ + {<<"roll-me-custom">>, feature_request, 0, <<"a">>, "foo", ExpectedReason, null} + ]), + ActualEvents = lists:sort(extract_events(Events)), + ExpectedEvents = ActualEvents. + variation_out_of_range(_) -> {{null, null, {error, malformed_flag}}, Events} = eld_eval:flag_key_for_user(default, <<"bad-variation">>, #{key => <<"some-user">>}, "foo"), diff --git a/test/eld_events_SUITE.erl b/test/eld_events_SUITE.erl index edfd927..49f270c 100644 --- a/test/eld_events_SUITE.erl +++ b/test/eld_events_SUITE.erl @@ -13,6 +13,8 @@ -export([ add_flag_eval_events_flush_with_track/1, add_flag_eval_events_flush_with_track_no_reasons/1, + add_flag_eval_events_flush_with_track_experimentation_rule/1, + add_flag_eval_events_flush_with_track_experimentation_fallthrough/1, add_flag_eval_events_flush_with_track_inline/1, add_flag_eval_events_flush_no_track/1, add_flag_eval_events_with_debug/1, @@ -31,6 +33,8 @@ all() -> [ add_flag_eval_events_flush_with_track, add_flag_eval_events_flush_with_track_no_reasons, + add_flag_eval_events_flush_with_track_experimentation_rule, + add_flag_eval_events_flush_with_track_experimentation_fallthrough, add_flag_eval_events_flush_with_track_inline, add_flag_eval_events_flush_no_track, add_flag_eval_events_with_debug, @@ -43,20 +47,27 @@ all() -> init_per_suite(Config) -> {ok, _} = application:ensure_all_started(eld), - eld:start_instance("", #{start_stream => false, events_dispatcher => eld_event_dispatch_test}), + Options = #{ + stream => false, + events_dispatcher => eld_event_dispatch_test, + polling_update_requestor => eld_update_requestor_test + }, + eld:start_instance("", Options), Another1Options = #{ - start_stream => false, + stream => false, events_capacity => 2, events_dispatcher => eld_event_dispatch_test, - events_flush_interval => 1000 + events_flush_interval => 1000, + polling_update_requestor => eld_update_requestor_test }, eld:start_instance("", another1, Another1Options), - InlinerOptions = #{ - start_stream => false, + InlineOptions = #{ + stream => false, events_dispatcher => eld_event_dispatch_test, - inline_users_in_events => true + inline_users_in_events => true, + polling_update_requestor => eld_update_requestor_test }, - eld:start_instance("", inliner, InlinerOptions), + eld:start_instance("", inliner, InlineOptions), Config. end_per_suite(_) -> @@ -89,6 +100,7 @@ get_simple_flag_track() -> <<"sel">> => <<"8b4d79c59adb4df492ebea0bf65dfd4c">>, <<"targets">> => [], <<"trackEvents">> => true, + <<"trackEventsFallthrough">> => false, <<"variations">> => [true,false], <<"version">> => 5 } @@ -111,6 +123,58 @@ get_simple_flag_no_track() -> <<"sel">> => <<"8b4d79c59adb4df492ebea0bf65dfd4c">>, <<"targets">> => [], <<"trackEvents">> => false, + <<"trackEventsFallthrough">> => false, + <<"variations">> => [true,false], + <<"version">> => 5 + } + }. + +get_simple_flag_experimentation_rule() -> + { + <<"abc">>, + #{ + <<"clientSide">> => false, + <<"debugEventsUntilDate">> => null, + <<"deleted">> => false, + <<"fallthrough">> => #{<<"variation">> => 0}, + <<"key">> => <<"abc">>, + <<"offVariation">> => 1, + <<"on">> => true, + <<"prerequisites">> => [], + <<"rules">> => [#{ + <<"clauses">> => [], + <<"id">> => <<"12345">>, + <<"variation">> => 0, + <<"trackEvents">> => true + }], + <<"salt">> => <<"d0888ec5921e45c7af5bc10b47b033ba">>, + <<"sel">> => <<"8b4d79c59adb4df492ebea0bf65dfd4c">>, + <<"targets">> => [], + <<"trackEvents">> => false, + <<"trackEventsFallthrough">> => false, + <<"variations">> => [true,false], + <<"version">> => 5 + } + }. + +get_simple_flag_experimentation_fallthrough() -> + { + <<"abc">>, + #{ + <<"clientSide">> => false, + <<"debugEventsUntilDate">> => null, + <<"deleted">> => false, + <<"fallthrough">> => #{<<"variation">> => 0}, + <<"key">> => <<"abc">>, + <<"offVariation">> => 1, + <<"on">> => true, + <<"prerequisites">> => [], + <<"rules">> => [], + <<"salt">> => <<"d0888ec5921e45c7af5bc10b47b033ba">>, + <<"sel">> => <<"8b4d79c59adb4df492ebea0bf65dfd4c">>, + <<"targets">> => [], + <<"trackEvents">> => false, + <<"trackEventsFallthrough">> => true, <<"variations">> => [true,false], <<"version">> => 5 } @@ -135,6 +199,7 @@ get_simple_flag_debug() -> <<"sel">> => <<"8b4d79c59adb4df492ebea0bf65dfd4c">>, <<"targets">> => [], <<"trackEvents">> => false, + <<"trackEventsFallthrough">> => false, <<"variations">> => [true,false], <<"version">> => 5 } @@ -267,6 +332,108 @@ add_flag_eval_events_flush_with_track_no_reasons(_) -> } ] = ActualEvents. +add_flag_eval_events_flush_with_track_experimentation_rule(_) -> + {FlagKey, FlagBin} = get_simple_flag_experimentation_rule(), + Flag = eld_flag:new(FlagKey, FlagBin), + Event = eld_event:new_flag_eval( + 5, + <<"variation-value-5">>, + <<"default-value">>, + #{key => <<"12345-track-experimentation-rule">>}, + {rule_match, 0, <<"12345">>}, + Flag + ), + Events = [Event], + ActualEvents = send_await_events(Events, #{flush => true, include_reasons => false}), + [ + #{ + <<"features">> := #{ + <<"abc">> := #{ + <<"counters">> := [ + #{ + <<"count">> := 1, + <<"unknown">> := false, + <<"value">> := <<"variation-value-5">>, + <<"variation">> := 5, + <<"version">> := 5 + } + ], + <<"default">> := <<"default-value">> + } + }, + <<"kind">> := <<"summary">>, + <<"startDate">> := _, + <<"endDate">> := _ + }, + #{ + <<"kind">> := <<"index">>, + <<"user">> := #{<<"key">> := <<"12345-track-experimentation-rule">>}, + <<"creationDate">> := _ + }, + #{ + <<"kind">> := <<"feature">>, + <<"key">> := <<"abc">>, + <<"default">> := <<"default-value">>, + <<"reason">> := #{<<"kind">> := <<"RULE_MATCH">>, <<"ruleId">> := <<"12345">>, <<"ruleIndex">> := 0}, + <<"userKey">> := <<"12345-track-experimentation-rule">>, + <<"value">> := <<"variation-value-5">>, + <<"variation">> := 5, + <<"version">> := 5, + <<"creationDate">> := _ + } + ] = ActualEvents. + +add_flag_eval_events_flush_with_track_experimentation_fallthrough(_) -> + {FlagKey, FlagBin} = get_simple_flag_experimentation_fallthrough(), + Flag = eld_flag:new(FlagKey, FlagBin), + Event = eld_event:new_flag_eval( + 5, + <<"variation-value-5">>, + <<"default-value">>, + #{key => <<"12345-track-experimentation-fallthrough">>}, + fallthrough, + Flag + ), + Events = [Event], + ActualEvents = send_await_events(Events, #{flush => true, include_reasons => false}), + [ + #{ + <<"features">> := #{ + <<"abc">> := #{ + <<"counters">> := [ + #{ + <<"count">> := 1, + <<"unknown">> := false, + <<"value">> := <<"variation-value-5">>, + <<"variation">> := 5, + <<"version">> := 5 + } + ], + <<"default">> := <<"default-value">> + } + }, + <<"kind">> := <<"summary">>, + <<"startDate">> := _, + <<"endDate">> := _ + }, + #{ + <<"kind">> := <<"index">>, + <<"user">> := #{<<"key">> := <<"12345-track-experimentation-fallthrough">>}, + <<"creationDate">> := _ + }, + #{ + <<"kind">> := <<"feature">>, + <<"key">> := <<"abc">>, + <<"default">> := <<"default-value">>, + <<"reason">> := #{<<"kind">> := <<"FALLTHROUGH">>}, + <<"userKey">> := <<"12345-track-experimentation-fallthrough">>, + <<"value">> := <<"variation-value-5">>, + <<"variation">> := 5, + <<"version">> := 5, + <<"creationDate">> := _ + } + ] = ActualEvents. + add_flag_eval_events_flush_with_track_inline(_) -> {FlagKey, FlagBin} = get_simple_flag_track(), Flag = eld_flag:new(FlagKey, FlagBin), @@ -460,7 +627,8 @@ add_identify_events(_) -> add_custom_events(_) -> Event1 = eld_event:new_custom(<<"event-foo">>, #{key => <<"12345">>}, #{k1 => <<"v1">>}), Event2 = eld_event:new_custom(<<"event-bar">>, #{key => <<"abcde">>}, #{k2 => <<"v2">>}), - Events = [Event1, Event2], + Event3 = eld_event:new_custom(<<"event-baz">>, #{key => <<"98765">>}, #{k3 => <<"v3">>}, 123), + Events = [Event1, Event2, Event3], ActualEvents = send_await_events(Events, #{flush => true}), [ #{ @@ -486,6 +654,19 @@ add_custom_events(_) -> <<"userKey">> := <<"abcde">>, <<"data">> := #{<<"k2">> := <<"v2">>}, <<"creationDate">> := _ + }, + #{ + <<"kind">> := <<"index">>, + <<"user">> := #{<<"key">> := <<"98765">>}, + <<"creationDate">> := _ + }, + #{ + <<"key">> := <<"event-baz">>, + <<"kind">> := <<"custom">>, + <<"userKey">> := <<"98765">>, + <<"data">> := #{<<"k3">> := <<"v3">>}, + <<"metricValue">> := 123, + <<"creationDate">> := _ } ] = ActualEvents. diff --git a/test/eld_poll_SUITE.erl b/test/eld_poll_SUITE.erl new file mode 100644 index 0000000..44b4650 --- /dev/null +++ b/test/eld_poll_SUITE.erl @@ -0,0 +1,80 @@ +-module(eld_poll_SUITE). + +-include_lib("common_test/include/ct.hrl"). + +%% ct functions +-export([all/0]). +-export([init_per_suite/1]). +-export([end_per_suite/1]). +-export([init_per_testcase/2]). +-export([end_per_testcase/2]). + +%% Tests +-export([ + response_error_unauthorized/1, + response_error_not_found/1, + response_error_internal/1, + response_flags_segments/1 +]). + +%%==================================================================== +%% ct functions +%%==================================================================== + +all() -> + [ + response_error_unauthorized, + response_error_not_found, + response_error_internal, + response_flags_segments + ]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(eld), + Config. + +end_per_suite(_) -> + ok = application:stop(eld). + +init_per_testcase(_, Config) -> + Config. + +end_per_testcase(_, _Config) -> + ok. + +%%==================================================================== +%% Helpers +%%==================================================================== + +instance_options() -> + #{ + stream => false, + polling_update_requestor => eld_update_requestor_test + }. + +%%==================================================================== +%% Tests +%%==================================================================== + +response_error_unauthorized(_) -> + ok = eld:start_instance("sdk-key-unauthorized", instance_options()), + [] = eld_storage_ets:list(default, flags), + ok = eld:stop_instance(). + +response_error_not_found(_) -> + ok = eld:start_instance("sdk-key-not-found", instance_options()), + [] = eld_storage_ets:list(default, flags), + ok = eld:stop_instance(). + +response_error_internal(_) -> + ok = eld:start_instance("sdk-key-internal-error", instance_options()), + [] = eld_storage_ets:list(default, flags), + ok = eld:stop_instance(). + +response_flags_segments(_) -> + ok = eld:start_instance("sdk-key-flags-segments", instance_options()), + {FlagKey, _, FlagMap} = eld_update_requestor_test:get_simple_flag(), + {SegmentKey, _, SegmentMap} = eld_update_requestor_test:get_simple_segment(), + [{FlagKey, FlagMap}] = eld_storage_ets:list(default, flags), + [{SegmentKey, SegmentMap}] = eld_storage_ets:list(default, segments), + ok = eld:stop_instance(). diff --git a/test/eld_storage_ets_SUITE.erl b/test/eld_storage_ets_SUITE.erl index 33f83ef..3f03858 100644 --- a/test/eld_storage_ets_SUITE.erl +++ b/test/eld_storage_ets_SUITE.erl @@ -14,6 +14,7 @@ server_init/1, server_bucket_exists/1, server_get_put/1, + server_put_clean/1, server_list/1, server_process_events_put/1, server_process_events_patch/1 @@ -28,6 +29,7 @@ all() -> server_init, server_bucket_exists, server_get_put, + server_put_clean, server_list, server_process_events_put, server_process_events_patch @@ -35,8 +37,12 @@ all() -> init_per_suite(Config) -> {ok, _} = application:ensure_all_started(eld), - eld:start_instance("", #{start_stream => false}), - eld:start_instance("", another1, #{start_stream => false}), + Options = #{ + stream => false, + polling_update_requestor => eld_update_requestor_test + }, + eld:start_instance("", Options), + eld:start_instance("", another1, Options), Config. end_per_suite(_) -> @@ -80,13 +86,16 @@ server_get_put(_) -> [{<<"flag1">>, [{<<"valueA">>, 0.9}]}] = eld_storage_ets:get(another1, flags, <<"flag1">>), [{<<"flag1">>, [{<<"value1">>, 0.5}]}] = eld_storage_ets:get(default, flags, <<"flag1">>). +server_put_clean(_) -> + ok = eld_storage_ets:put(default, flags, #{<<"flag1">> => [{<<"value1">>, 0.5}]}), + ok = eld_storage_ets:put_clean(default, flags, #{<<"flag2">> => [{<<"value2">>, 0.9}]}), + [{<<"flag2">>, [{<<"value2">>, 0.9}]}] = eld_storage_ets:list(default, flags). + server_list(_) -> [] = eld_storage_ets:list(default, flags), [] = eld_storage_ets:list(another1, flags), - ok = eld_storage_ets:put(default, flags, #{<<"flag1">> => [{<<"value1">>, 0.5}]}), - ok = eld_storage_ets:put(default, flags, #{<<"flag2">> => [{<<"value2">>, 0.7}]}), - ok = eld_storage_ets:put(another1, flags, #{<<"flag1">> => [{<<"value1">>, 0.9}]}), - ok = eld_storage_ets:put(another1, flags, #{<<"flag5">> => [{<<"value2">>, 0.77}]}), + ok = eld_storage_ets:put(default, flags, #{<<"flag1">> => [{<<"value1">>, 0.5}], <<"flag2">> => [{<<"value2">>, 0.7}]}), + ok = eld_storage_ets:put(another1, flags, #{<<"flag1">> => [{<<"value1">>, 0.9}], <<"flag5">> => [{<<"value2">>, 0.77}]}), [ {<<"flag1">>, [{<<"value1">>, 0.5}]}, {<<"flag2">>, [{<<"value2">>, 0.7}]} @@ -111,7 +120,7 @@ server_process_events_put(_) -> } } }, - ok = eld_stream_server:process_items(put, Event, eld_storage_ets, default), + ok = eld_update_stream_server:process_items(put, Event, eld_storage_ets, default), [ {<<"flag-key-1">>, <<"flag-value-1">>}, {<<"flag-key-2">>, <<"flag-value-2">>}, @@ -133,12 +142,12 @@ server_process_events_patch(_) -> <<"segments">> => #{} } }, - ok = eld_stream_server:process_items(put, PutEvent, eld_storage_ets, default), + ok = eld_update_stream_server:process_items(put, PutEvent, eld_storage_ets, default), PatchEvent = #{ <<"path">> => <<"/flags/flag-key-2">>, <<"data">> => #{<<"key">> => <<"flag-key-2">>, <<"on">> => false} }, - ok = eld_stream_server:process_items(patch, PatchEvent, eld_storage_ets, default), + ok = eld_update_stream_server:process_items(patch, PatchEvent, eld_storage_ets, default), [ {<<"flag-key-1">>, #{<<"key">> := <<"flag-key-1">>, <<"on">> := true}}, {<<"flag-key-2">>, #{<<"key">> := <<"flag-key-2">>, <<"on">> := false}} diff --git a/test/eld_storage_map_SUITE.erl b/test/eld_storage_map_SUITE.erl index 1649daa..4ce7b04 100644 --- a/test/eld_storage_map_SUITE.erl +++ b/test/eld_storage_map_SUITE.erl @@ -14,6 +14,7 @@ server_init/1, server_bucket_exists/1, server_get_put/1, + server_put_clean/1, server_list/1, server_process_events_put/1, server_process_events_patch/1 @@ -28,6 +29,7 @@ all() -> server_init, server_bucket_exists, server_get_put, + server_put_clean, server_list, server_process_events_put, server_process_events_patch @@ -35,8 +37,13 @@ all() -> init_per_suite(Config) -> {ok, _} = application:ensure_all_started(eld), - eld:start_instance("", #{storage_backend => eld_storage_map, start_stream => false}), - eld:start_instance("", another1, #{storage_backend => eld_storage_map, start_stream => false}), + Options = #{ + storage_backend => eld_storage_map, + stream => false, + polling_update_requestor => eld_update_requestor_test + }, + eld:start_instance("", Options), + eld:start_instance("", another1, Options), Config. end_per_suite(_) -> @@ -81,6 +88,12 @@ server_get_put(_) -> [{<<"flag1">>, [{<<"valueA">>, 0.9}]}] = eld_storage_map:get(another1, flags, <<"flag1">>), [{<<"flag1">>, [{<<"value1">>, 0.5}]}] = eld_storage_map:get(default, flags, <<"flag1">>). +server_put_clean(_) -> + ok = eld_storage_map:put(default, flags, #{<<"flag1">> => [{<<"value1">>, 0.5}]}), + [{<<"flag1">>, [{<<"value1">>, 0.5}]}] = eld_storage_map:get(default, flags, <<"flag1">>), + ok = eld_storage_map:put_clean(default, flags, #{<<"flag2">> => [{<<"value2">>, 0.9}]}), + [{<<"flag2">>, [{<<"value2">>, 0.9}]}] = eld_storage_map:list(default, flags). + server_list(_) -> [] = eld_storage_map:list(default, flags), [] = eld_storage_map:list(another1, flags), @@ -112,7 +125,7 @@ server_process_events_put(_) -> } } }, - ok = eld_stream_server:process_items(put, Event, eld_storage_map, default), + ok = eld_update_stream_server:process_items(put, Event, eld_storage_map, default), [ {<<"flag-key-1">>, <<"flag-value-1">>}, {<<"flag-key-2">>, <<"flag-value-2">>}, @@ -134,12 +147,12 @@ server_process_events_patch(_) -> <<"segments">> => #{} } }, - ok = eld_stream_server:process_items(put, PutEvent, eld_storage_map, default), + ok = eld_update_stream_server:process_items(put, PutEvent, eld_storage_map, default), PatchEvent = #{ <<"path">> => <<"/flags/flag-key-2">>, <<"data">> => #{<<"key">> => <<"flag-key-2">>, <<"on">> => false} }, - ok = eld_stream_server:process_items(patch, PatchEvent, eld_storage_map, default), + ok = eld_update_stream_server:process_items(patch, PatchEvent, eld_storage_map, default), [ {<<"flag-key-1">>, #{<<"key">> := <<"flag-key-1">>, <<"on">> := true}}, {<<"flag-key-2">>, #{<<"key">> := <<"flag-key-2">>, <<"on">> := false}} diff --git a/test/eld_stream_SUITE.erl b/test/eld_stream_SUITE.erl index f3ca045..4e84a89 100644 --- a/test/eld_stream_SUITE.erl +++ b/test/eld_stream_SUITE.erl @@ -33,7 +33,11 @@ all() -> init_per_suite(Config) -> {ok, _} = application:ensure_all_started(eld), - eld:start_instance("", #{start_stream => false}), + Options = #{ + stream => false, + polling_update_requestor => eld_update_requestor_test + }, + eld:start_instance("", Options), Config. end_per_suite(_) -> @@ -247,7 +251,6 @@ get_flag_with_extra_property() -> } }. - %%==================================================================== %% Tests %%==================================================================== @@ -262,12 +265,12 @@ server_process_event_put_patch(_) -> "\"segments\":{}", "}", "}">>, - ok = eld_stream_server:process_event(#{event => <<"put">>, data => PutData}, eld_storage_ets, default), + ok = eld_update_stream_server:process_event(#{event => <<"put">>, data => PutData}, eld_storage_ets, default), [] = eld_storage_ets:list(default, segments), [{FlagSimpleKey, FlagSimpleMap}] = eld_storage_ets:list(default, flags), {FlagSimpleKey, FlagPatchBin, FlagPatchMap} = get_simple_flag_patch(), PatchData = <<"{\"path\":\"/flags/", FlagSimpleKey/binary, "\",", FlagPatchBin/binary, "}">>, - ok = eld_stream_server:process_event(#{event => <<"patch">>, data => PatchData}, eld_storage_ets, default), + ok = eld_update_stream_server:process_event(#{event => <<"patch">>, data => PatchData}, eld_storage_ets, default), [{FlagSimpleKey, FlagPatchMap}] = eld_storage_ets:list(default, flags), ok. @@ -281,12 +284,12 @@ server_process_event_put_patch_flag_with_extra_property(_) -> "\"segments\":{}", "}", "}">>, - ok = eld_stream_server:process_event(#{event => <<"put">>, data => PutData}, eld_storage_ets, default), + ok = eld_update_stream_server:process_event(#{event => <<"put">>, data => PutData}, eld_storage_ets, default), [] = eld_storage_ets:list(default, segments), [{FlagSimpleKey, FlagSimpleMap}] = eld_storage_ets:list(default, flags), {FlagSimpleKey, FlagPatchBin, FlagPatchMap} = get_simple_flag_patch(), PatchData = <<"{\"path\":\"/flags/", FlagSimpleKey/binary, "\",", FlagPatchBin/binary, "}">>, - ok = eld_stream_server:process_event(#{event => <<"patch">>, data => PatchData}, eld_storage_ets, default), + ok = eld_update_stream_server:process_event(#{event => <<"patch">>, data => PatchData}, eld_storage_ets, default), [{FlagSimpleKey, FlagPatchMap}] = eld_storage_ets:list(default, flags), ok. @@ -300,12 +303,12 @@ server_process_event_put_patch_old_version(_) -> "\"segments\":{}", "}", "}">>, - ok = eld_stream_server:process_event(#{event => <<"put">>, data => PutData}, eld_storage_ets, default), + ok = eld_update_stream_server:process_event(#{event => <<"put">>, data => PutData}, eld_storage_ets, default), [] = eld_storage_ets:list(default, segments), [{FlagSimpleKey, FlagSimpleMap}] = eld_storage_ets:list(default, flags), {FlagSimpleKey, FlagPatchBin, _FlagPatchMap} = get_simple_flag_patch_old(), PatchData = <<"{\"path\":\"/flags/", FlagSimpleKey/binary, "\",", FlagPatchBin/binary, "}">>, - ok = eld_stream_server:process_event(#{event => <<"patch">>, data => PatchData}, eld_storage_ets, default), + ok = eld_update_stream_server:process_event(#{event => <<"patch">>, data => PatchData}, eld_storage_ets, default), [{FlagSimpleKey, FlagSimpleMap}] = eld_storage_ets:list(default, flags), ok. @@ -319,10 +322,10 @@ server_process_event_put_delete(_) -> "\"segments\":{}", "}", "}">>, - ok = eld_stream_server:process_event(#{event => <<"put">>, data => PutData}, eld_storage_ets, default), + ok = eld_update_stream_server:process_event(#{event => <<"put">>, data => PutData}, eld_storage_ets, default), [] = eld_storage_ets:list(default, segments), [{FlagSimpleKey, FlagSimpleMap}] = eld_storage_ets:list(default, flags), - ok = eld_stream_server:process_event(#{event => <<"delete">>, data => PutData}, eld_storage_ets, default), + ok = eld_update_stream_server:process_event(#{event => <<"delete">>, data => PutData}, eld_storage_ets, default), {FlagSimpleKey, _FlagDeleteBin, FlagDeleteMap} = get_simple_flag_delete(), [{FlagSimpleKey, FlagDeleteMap}] = eld_storage_ets:list(default, flags), ok. @@ -337,11 +340,11 @@ server_process_event_put_delete_single(_) -> "\"segments\":{}", "}", "}">>, - ok = eld_stream_server:process_event(#{event => <<"put">>, data => PutData}, eld_storage_ets, default), + ok = eld_update_stream_server:process_event(#{event => <<"put">>, data => PutData}, eld_storage_ets, default), [] = eld_storage_ets:list(default, segments), [{FlagSimpleKey, FlagSimpleMap}] = eld_storage_ets:list(default, flags), DeleteData = <<"{\"path\":\"/flags/", FlagSimpleKey/binary, "\"}">>, - ok = eld_stream_server:process_event(#{event => <<"delete">>, data => DeleteData}, eld_storage_ets, default), + ok = eld_update_stream_server:process_event(#{event => <<"delete">>, data => DeleteData}, eld_storage_ets, default), {FlagSimpleKey, _FlagDeleteBin, FlagDeleteMap} = get_simple_flag_delete(), [{FlagSimpleKey, FlagDeleteMap}] = eld_storage_ets:list(default, flags), ok. diff --git a/test/eld_update_requestor_test.erl b/test/eld_update_requestor_test.erl new file mode 100644 index 0000000..0568cb8 --- /dev/null +++ b/test/eld_update_requestor_test.erl @@ -0,0 +1,106 @@ +%%------------------------------------------------------------------- +%% @doc Polling update requestor for testing +%% +%% @end +%%------------------------------------------------------------------- + +-module(eld_update_requestor_test). + +-behaviour(eld_update_requestor). + +%% API +-export([get_simple_flag/0]). +-export([get_simple_segment/0]). + +%% Behavior callbacks +-export([all/2]). + +%%=================================================================== +%% API +%%=================================================================== + +get_simple_flag() -> + { + <<"abc">>, + <<"\"abc\":{", + "\"clientSide\":false,", + "\"debugEventsUntilDate\":null,", + "\"deleted\":false,", + "\"fallthrough\":{\"variation\":0},", + "\"key\":\"abc\",", + "\"offVariation\":1,", + "\"on\":true,", + "\"prerequisites\":[],", + "\"rules\":[],", + "\"salt\":\"d0888ec5921e45c7af5bc10b47b033ba\",", + "\"sel\":\"8b4d79c59adb4df492ebea0bf65dfd4c\",", + "\"targets\":[],", + "\"trackEvents\":true,", + "\"variations\":[true,false],", + "\"version\":5", + "}">>, + #{ + <<"clientSide">> => false, + <<"debugEventsUntilDate">> => null, + <<"deleted">> => false, + <<"fallthrough">> => #{<<"variation">> => 0}, + <<"key">> => <<"abc">>, + <<"offVariation">> => 1, + <<"on">> => true, + <<"prerequisites">> => [], + <<"rules">> => [], + <<"salt">> => <<"d0888ec5921e45c7af5bc10b47b033ba">>, + <<"sel">> => <<"8b4d79c59adb4df492ebea0bf65dfd4c">>, + <<"targets">> => [], + <<"trackEvents">> => true, + <<"variations">> => [true,false], + <<"version">> => 5 + } + }. + +get_simple_segment() -> + { + <<"xyz">>, + <<"\"xyz\": {", + "\"deleted\":false,", + "\"excluded\":[],", + "\"included\":[],", + "\"key\":\"xyz\",", + "\"rules\":[],", + "\"salt\":\"b2ba88c74ad34c288ec10ba78e150afd\",", + "\"version\":8", + "}">>, + #{ + <<"deleted">> => false, + <<"excluded">> => [], + <<"included">> => [], + <<"key">> => <<"xyz">>, + <<"rules">> => [], + <<"salt">> => <<"b2ba88c74ad34c288ec10ba78e150afd">>, + <<"version">> => 8 + } + }. + +%%=================================================================== +%% Behavior callbacks +%%=================================================================== + +%% @doc Return static values mocking the polling service +%% +%% @end +-spec all(Uri :: string(), SdkKey :: string()) -> eld_update_requestor:response(). +all(_Uri, SdkKey) -> + case SdkKey of + "sdk-key-unauthorized" -> + {error, 401, ""}; + "sdk-key-not-found" -> + {error, 404, ""}; + "sdk-key-internal-error" -> + {error, 500, ""}; + "sdk-key-flags-segments" -> + {_, FlagBin, _} = get_simple_flag(), + {_, SegmentBin, _} = get_simple_segment(), + {ok, <<"{\"flags\":{",FlagBin/binary,"},\"segments\":{",SegmentBin/binary,"}}">>}; + "" -> + {ok, <<"{\"flags\":{},\"segments\":{}}">>} + end.