1- using System . Diagnostics ;
2- using System . Net ;
31using Microsoft . Extensions . DependencyInjection ;
42using Microsoft . Extensions . Hosting ;
53using Microsoft . Extensions . Logging ;
64using Orleans . Configuration ;
7- using Orleans . Runtime . Placement ;
85
9- #nullable enable
10-
11- // Ledjon: The silos will run in the same process so they will have the same memory usage.
12- // I previously had 4 console apps to run the example, but didn't want to add so many proj into the solution.
13- // I am sure with something like Aspire that would be easier, but for now I'll leave them like this.
14- // You (the reader) feel free to run this in different processes for a more realistic example.
15-
16- var host0 = await StartSiloHost ( 0 ) ;
17- var host1 = await StartSiloHost ( 1 ) ;
18- var host2 = await StartSiloHost ( 2 ) ;
19- var host3 = await StartSiloHost ( 3 ) ;
20- IHost ? host5 = null ;
21-
22- Console . WriteLine ( "All silos have started." ) ;
23-
24- var grainFactory = host0 . Services . GetRequiredService < IGrainFactory > ( ) ;
25- var mgmtGrain = grainFactory . GetGrain < IManagementGrain > ( 0 ) ;
26-
27- var silos = await mgmtGrain . GetHosts ( onlyActive : true ) ;
28- Debug . Assert ( silos . Count == 4 ) ;
29- var addresses = silos . Select ( x => x . Key ) . ToArray ( ) ;
30-
31- var tasks = new List < Task > ( ) ;
32- RequestContext . Set ( IPlacementDirector . PlacementHintKey , addresses [ 0 ] ) ;
33- for ( var i = 0 ; i < 300 ; i ++ )
34- {
35- tasks . Add ( grainFactory . GetGrain < IRebalancingTestGrain > ( Guid . NewGuid ( ) ) . Ping ( ) ) ;
36- }
37-
38- RequestContext . Set ( IPlacementDirector . PlacementHintKey , addresses [ 1 ] ) ;
39- for ( var i = 0 ; i < 30 ; i ++ )
40- {
41- tasks . Add ( grainFactory . GetGrain < IRebalancingTestGrain > ( Guid . NewGuid ( ) ) . Ping ( ) ) ;
42- }
43-
44- RequestContext . Set ( IPlacementDirector . PlacementHintKey , addresses [ 2 ] ) ;
45- for ( var i = 0 ; i < 410 ; i ++ )
46- {
47- tasks . Add ( grainFactory . GetGrain < IRebalancingTestGrain > ( Guid . NewGuid ( ) ) . Ping ( ) ) ;
48- }
49-
50- RequestContext . Set ( IPlacementDirector . PlacementHintKey , addresses [ 3 ] ) ;
51- for ( var i = 0 ; i < 120 ; i ++ )
52- {
53- tasks . Add ( grainFactory . GetGrain < IRebalancingTestGrain > ( Guid . NewGuid ( ) ) . Ping ( ) ) ;
54- }
55-
56- var sessionCount = 0 ;
57- while ( true )
58- {
59- if ( sessionCount == 25 )
6+ var builder = Host . CreateApplicationBuilder ( args ) ;
7+ builder . AddKeyedRedisClient ( "orleans-redis" ) ;
8+ builder . Logging . AddFilter ( "Orleans.Runtime.Placement.Rebalancing" , LogLevel . Trace ) ;
9+ #pragma warning disable ORLEANSEXP002
10+ builder . UseOrleans ( builder => builder
11+ . Configure < GrainCollectionOptions > ( o =>
6012 {
61- RequestContext . Set ( IPlacementDirector . PlacementHintKey , addresses [ 0 ] ) ;
62- for ( var i = 0 ; i < 50 ; i ++ )
63- {
64- tasks . Add ( grainFactory . GetGrain < IRebalancingTestGrain > ( Guid . NewGuid ( ) ) . Ping ( ) ) ;
65- }
66-
67- RequestContext . Set ( IPlacementDirector . PlacementHintKey , addresses [ 1 ] ) ;
68- for ( var i = 0 ; i < 50 ; i ++ )
69- {
70- tasks . Add ( grainFactory . GetGrain < IRebalancingTestGrain > ( Guid . NewGuid ( ) ) . Ping ( ) ) ;
71- }
72- }
73-
74- if ( sessionCount == 35 )
13+ o . CollectionQuantum = TimeSpan . FromSeconds ( 15 ) ;
14+ } )
15+ . Configure < ResourceOptimizedPlacementOptions > ( o =>
7516 {
76- RequestContext . Set ( IPlacementDirector . PlacementHintKey , addresses [ 1 ] ) ;
77- for ( var i = 0 ; i < 50 ; i ++ )
78- {
79- tasks . Add ( grainFactory . GetGrain < IRebalancingTestGrain > ( Guid . NewGuid ( ) ) . Ping ( ) ) ;
80- }
17+ o . LocalSiloPreferenceMargin = 0 ;
18+ } )
19+ . Configure < ActivationRebalancerOptions > ( o =>
20+ {
21+ o . RebalancerDueTime = TimeSpan . FromSeconds ( 5 ) ;
22+ o . SessionCyclePeriod = TimeSpan . FromSeconds ( 5 ) ;
23+ // uncomment these below, if you want higher migration rate
24+ //o.CycleNumberWeight = 1;
25+ //o.SiloNumberWeight = 0;
26+ } )
27+ . AddActivationRebalancer ( ) ) ;
28+ #pragma warning restore ORLEANSEXP002
8129
82- RequestContext . Set ( IPlacementDirector . PlacementHintKey , addresses [ 2 ] ) ;
83- for ( var i = 0 ; i < 50 ; i ++ )
84- {
85- tasks . Add ( grainFactory . GetGrain < IRebalancingTestGrain > ( Guid . NewGuid ( ) ) . Ping ( ) ) ;
86- }
87- }
30+ builder . Services . AddHostedService < LoadDriverBackgroundService > ( ) ;
31+ var app = builder . Build ( ) ;
8832
89- if ( sessionCount == 40 )
90- {
91- host5 = await StartSiloHost ( 4 ) ;
92- }
33+ await app . RunAsync ( ) ;
9334
94- if ( sessionCount == 45 )
35+ internal class LoadDriverBackgroundService ( IGrainFactory client ) : BackgroundService
36+ {
37+ protected override async Task ExecuteAsync ( CancellationToken stoppingToken )
9538 {
96- RequestContext . Set ( IPlacementDirector . PlacementHintKey , addresses [ 2 ] ) ;
97- for ( var i = 0 ; i < 50 ; i ++ )
39+ while ( ! stoppingToken . IsCancellationRequested )
9840 {
99- tasks . Add ( grainFactory . GetGrain < IRebalancingTestGrain > ( Guid . NewGuid ( ) ) . Ping ( ) ) ;
100- }
41+ for ( var i = 0 ; i < 5 * Random . Shared . Next ( 1 , 1000 ) ; i ++ )
42+ {
43+ await client . GetGrain < IRebalancingTestGrain > ( Guid . NewGuid ( ) ) . Ping ( ) ;
44+ }
10145
102- RequestContext . Set ( IPlacementDirector . PlacementHintKey , addresses [ 3 ] ) ;
103- for ( var i = 0 ; i < 50 ; i ++ )
104- {
105- tasks . Add ( grainFactory . GetGrain < IRebalancingTestGrain > ( Guid . NewGuid ( ) ) . Ping ( ) ) ;
46+ await Task . Delay ( Random . Shared . Next ( 500 , 1_000 ) , stoppingToken ) ;
10647 }
10748 }
108-
109- await Task . Delay ( 5000 ) ; // session duration
110- sessionCount ++ ;
111-
112- if ( sessionCount > 55 )
113- {
114- break ;
115- }
116- }
117-
118- Console . WriteLine ( "Simulation has finished. Press Enter to terminate..." ) ;
119- Console . ReadLine ( ) ;
120-
121- await host0 . StopAsync ( ) ;
122- await host1 . StopAsync ( ) ;
123- await host2 . StopAsync ( ) ;
124- await host3 . StopAsync ( ) ;
125-
126- if ( host5 != null )
127- {
128- await host5 . StopAsync ( ) ;
129- }
130-
131- static async Task < IHost > StartSiloHost ( int num )
132- {
133- #pragma warning disable ORLEANSEXP002
134- var host = Host . CreateDefaultBuilder ( )
135- . ConfigureLogging ( builder => builder
136- . AddFilter ( "" , LogLevel . Error )
137- . AddFilter ( "Orleans.Runtime.Placement.Rebalancing" , LogLevel . Trace )
138- . AddConsole ( ) )
139- . UseOrleans ( builder => builder
140- . Configure < ActivationRebalancerOptions > ( o =>
141- {
142- o . RebalancerDueTime = TimeSpan . FromSeconds ( 5 ) ;
143- o . SessionCyclePeriod = TimeSpan . FromSeconds ( 5 ) ;
144- // uncomment these below, if you want higher migration rate
145- //o.CycleNumberWeight = 1;
146- //o.SiloNumberWeight = 0;
147- } )
148- . UseLocalhostClustering (
149- siloPort : EndpointOptions . DEFAULT_SILO_PORT + num ,
150- gatewayPort : EndpointOptions . DEFAULT_GATEWAY_PORT + num ,
151- primarySiloEndpoint : new IPEndPoint ( IPAddress . Loopback , EndpointOptions . DEFAULT_SILO_PORT ) )
152- . AddActivationRebalancer ( ) )
153- . Build ( ) ;
154- #pragma warning restore ORLEANSEXP002
155-
156- await host . StartAsync ( ) ;
157- Console . WriteLine ( $ "Silo{ num } started.") ;
158-
159- return host ;
16049}
16150
16251public interface IRebalancingTestGrain : IGrainWithGuidKey
16352{
16453 Task Ping ( ) ;
16554}
16655
56+ [ CollectionAgeLimit ( Minutes = 0.5 ) ]
16757public class RebalancingTestGrain : Grain , IRebalancingTestGrain
16858{
16959 public Task Ping ( ) => Task . CompletedTask ;
170- }
60+ }
0 commit comments