Skip to content

Commit 7ce00b2

Browse files
authored
feat:Add SpRateLimiter and SpMemoryManager (#3796)
1 parent ecaa286 commit 7ce00b2

File tree

27 files changed

+1113
-19
lines changed

27 files changed

+1113
-19
lines changed

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
<eclipse.milo.version>0.6.16</eclipse.milo.version>
6262
<error-prone.version>2.10.0</error-prone.version>
6363
<file-management.version>3.1.0</file-management.version>
64+
<findbugs.version>3.0.2</findbugs.version>
6465
<flink.version>1.13.5</flink.version>
6566
<fogsy-qudt.version>1.0</fogsy-qudt.version>
6667
<geojson-jackson.version>1.14</geojson-jackson.version>
@@ -240,6 +241,12 @@
240241
<artifactId>gson</artifactId>
241242
<version>${gson.version}</version>
242243
</dependency>
244+
<dependency>
245+
<groupId>com.google.code.findbugs</groupId>
246+
<artifactId>jsr305</artifactId>
247+
<version>${findbugs.version}</version>
248+
<scope>compile</scope>
249+
</dependency>
243250
<dependency>
244251
<groupId>com.google.guava</groupId>
245252
<artifactId>guava</artifactId>

streampipes-commons/src/main/java/org/apache/streampipes/commons/constants/Envs.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,30 @@ public enum Envs {
170170
SP_LOGGING_FILE_PREFIX("SP_LOGGING_FILE_PREFIX", "streampipes"),
171171
SP_LOGGING_FILE_DIR("SP_LOGGING_FILE_DIR", "logs"),
172172
SP_LOGGING_FILE_PATTERN(
173-
"SP_LOGGING_FILE_PATTERN", "%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"
174-
);
175-
173+
"SP_LOGGING_FILE_PATTERN",
174+
"%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{36} - %msg%n"
175+
),
176176

177+
//SpRateLimiter
178+
SP_RATE_LIMITER_DEFAULT_WARMUP_PERIOD("SP_RATE_LIMITER_DEFAULT_WARMUP_PERIOD", "1000"),
179+
SP_RATE_LIMITER_SCHEDULER_INITIAL_DELAY_SECONDS("SP_RATE_LIMITER_SCHEDULER_INITIAL_DELAY_SECONDS", "0"),
180+
SP_RATE_LIMITER_SCHEDULER_PERIOD_SECONDS("SP_RATE_LIMITER_SCHEDULER_PERIOD_SECONDS", "15"),
181+
SP_RATE_LIMITER_STATS_RESET_THRESHOLD("SP_RATE_LIMITER_STATS_RESET_THRESHOLD", "1000"),
182+
SP_RATE_LIMITER_STATS_RESET_FACTOR("SP_RATE_LIMITER_STATS_RESET_FACTOR", "999"),
183+
SP_RATE_LIMITER_STATS_RESET_DIVISOR("SP_RATE_LIMITER_STATS_RESET_DIVISOR", "1000"),
184+
SP_RATE_LIMITER_SHUTDOWN_TIMEOUT_SECONDS("SP_RATE_LIMITER_SHUTDOWN_TIMEOUT_SECONDS", "5"),
185+
SP_RATE_LIMITER_TIMEOUT_MS("SP_RATE_LIMITER_TIMEOUT_MS", "1000"),
186+
SP_RATE_LIMITER_PERMITS_SET_PERCENTAGE("SP_RATE_LIMITER_PERMITS_SET_PERCENTAGE", "0.7"),
187+
188+
//SpMemoryManager
189+
SP_MEMORY_MANAGER_DEFAULT_INITIAL_MEMORY("SP_MEMORY_MANAGER_DEFAULT_INITIAL_MEMORY", "1073741824"),
190+
SP_MEMORY_MANAGER_WAIT_TIMEOUT_MS("SP_MEMORY_MANAGER_WAIT_TIMEOUT_MS", "1000"),
191+
SP_MEMORY_SCHEDULER_INITIAL_DELAY_SECONDS("SP_MEMORY_SCHEDULER_INITIAL_DELAY_SECONDS", "0"),
192+
SP_MEMORY_SCHEDULER_PERIOD_SECONDS("SP_MEMORY_SCHEDULER_PERIOD_SECONDS", "15"),
193+
SP_MEMORY_BYTES_TO_MB("SP_MEMORY_BYTES_TO_MB", "1048576"),
194+
SP_MEMORY_MANAGER_SHUTDOWN_TIMEOUT_SECONDS("SP_MEMORY_MANAGER_SHUTDOWN_TIMEOUT_SECONDS", "5"),
195+
SP_MEMORY_MANAGER_USAGE_THRESHOLD("SP_MEMORY_MANAGER_USAGE_THRESHOLD", "0.9"),
196+
SP_MEMORY_WARNING_THRESHOLD("SP_MEMORY_WARNING_THRESHOLD", "0.8");
177197

178198
private final String envVariableName;
179199
private String defaultValue;

streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/DefaultEnvironment.java

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@
2222
import org.apache.streampipes.commons.environment.parser.OAuthConfigurationParser;
2323
import org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
2424
import org.apache.streampipes.commons.environment.variable.DoubleEnvironmentVariable;
25+
import org.apache.streampipes.commons.environment.variable.FloatEnvironmentVariable;
2526
import org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
27+
import org.apache.streampipes.commons.environment.variable.LongEnvironmentVariable;
2628
import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
27-
import org.apache.streampipes.commons.environment.variable.FloatEnvironmentVariable;
2829

2930
import java.util.List;
3031

@@ -456,7 +457,6 @@ public StringEnvironmentVariable getRetentionLocalDir() {
456457
return new StringEnvironmentVariable(Envs.SP_RETENTION_LOCAL_DIR);
457458
}
458459

459-
460460
@Override
461461
public DoubleEnvironmentVariable getDirMemoryResourceWeight() {
462462
return new DoubleEnvironmentVariable(Envs.DIR_MEMORY_RESOURCE_WEIGHT);
@@ -517,6 +517,91 @@ public BooleanEnvironmentVariable getLoadManagerEnable() {
517517
return new BooleanEnvironmentVariable(Envs.LOAD_MANAGER_ENABLE);
518518
}
519519

520+
@Override
521+
public LongEnvironmentVariable getRateLimiterDefaultWarmupPeriod() {
522+
return new LongEnvironmentVariable(Envs.SP_RATE_LIMITER_DEFAULT_WARMUP_PERIOD);
523+
}
524+
525+
@Override
526+
public IntEnvironmentVariable getRateLimiterSchedulerInitialDelaySeconds() {
527+
return new IntEnvironmentVariable(Envs.SP_RATE_LIMITER_SCHEDULER_INITIAL_DELAY_SECONDS);
528+
}
529+
530+
@Override
531+
public IntEnvironmentVariable getRateLimiterSchedulerPeriodSeconds() {
532+
return new IntEnvironmentVariable(Envs.SP_RATE_LIMITER_SCHEDULER_PERIOD_SECONDS);
533+
}
534+
535+
@Override
536+
public IntEnvironmentVariable getRateLimiterStatsResetThreshold() {
537+
return new IntEnvironmentVariable(Envs.SP_RATE_LIMITER_STATS_RESET_THRESHOLD);
538+
}
539+
540+
@Override
541+
public IntEnvironmentVariable getRateLimiterStatsResetFactor() {
542+
return new IntEnvironmentVariable(Envs.SP_RATE_LIMITER_STATS_RESET_FACTOR);
543+
}
544+
545+
@Override
546+
public IntEnvironmentVariable getRateLimiterStatsResetDivisor() {
547+
return new IntEnvironmentVariable(Envs.SP_RATE_LIMITER_STATS_RESET_DIVISOR);
548+
}
549+
550+
@Override
551+
public IntEnvironmentVariable getRateLimiterShutdownTimeoutSeconds() {
552+
return new IntEnvironmentVariable(Envs.SP_RATE_LIMITER_SHUTDOWN_TIMEOUT_SECONDS);
553+
}
554+
555+
@Override
556+
public LongEnvironmentVariable getRateLimiterTimeoutMs() {
557+
return new LongEnvironmentVariable(Envs.SP_RATE_LIMITER_TIMEOUT_MS);
558+
}
559+
560+
@Override
561+
public DoubleEnvironmentVariable getRateLimiterPermitsSetPercentage() {
562+
return new DoubleEnvironmentVariable(Envs.SP_RATE_LIMITER_PERMITS_SET_PERCENTAGE);
563+
}
564+
565+
@Override
566+
public LongEnvironmentVariable getMemoryManagerDefaultInitialMemory() {
567+
return new LongEnvironmentVariable(Envs.SP_MEMORY_MANAGER_DEFAULT_INITIAL_MEMORY);
568+
}
569+
570+
@Override
571+
public LongEnvironmentVariable getMemoryManagerWaitTimeoutMs() {
572+
return new LongEnvironmentVariable(Envs.SP_MEMORY_MANAGER_WAIT_TIMEOUT_MS);
573+
}
574+
575+
@Override
576+
public IntEnvironmentVariable getMemorySchedulerInitialDelaySeconds() {
577+
return new IntEnvironmentVariable(Envs.SP_MEMORY_SCHEDULER_INITIAL_DELAY_SECONDS);
578+
}
579+
580+
@Override
581+
public IntEnvironmentVariable getMemorySchedulerPeriodSeconds() {
582+
return new IntEnvironmentVariable(Envs.SP_MEMORY_SCHEDULER_PERIOD_SECONDS);
583+
}
584+
585+
@Override
586+
public LongEnvironmentVariable getMemoryBytesToMb() {
587+
return new LongEnvironmentVariable(Envs.SP_MEMORY_BYTES_TO_MB);
588+
}
589+
590+
@Override
591+
public IntEnvironmentVariable getMemoryManagerShutdownTimeoutSeconds() {
592+
return new IntEnvironmentVariable(Envs.SP_MEMORY_MANAGER_SHUTDOWN_TIMEOUT_SECONDS);
593+
}
594+
595+
@Override
596+
public DoubleEnvironmentVariable getMemoryManagerUsageThreshold() {
597+
return new DoubleEnvironmentVariable(Envs.SP_MEMORY_MANAGER_USAGE_THRESHOLD);
598+
}
599+
600+
@Override
601+
public DoubleEnvironmentVariable getMemoryWarningThreshold() {
602+
return new DoubleEnvironmentVariable(Envs.SP_MEMORY_WARNING_THRESHOLD);
603+
}
604+
520605
@Override
521606
public StringEnvironmentVariable getDatalakeSchedulerCron() {
522607
return new StringEnvironmentVariable(Envs.SP_DATALAKE_SCHEDULER_CRON);

streampipes-commons/src/main/java/org/apache/streampipes/commons/environment/Environment.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@
1818
package org.apache.streampipes.commons.environment;
1919

2020
import org.apache.streampipes.commons.environment.model.OAuthConfiguration;
21-
import org.apache.streampipes.commons.environment.variable.*;
21+
import org.apache.streampipes.commons.environment.variable.BooleanEnvironmentVariable;
22+
import org.apache.streampipes.commons.environment.variable.DoubleEnvironmentVariable;
23+
import org.apache.streampipes.commons.environment.variable.FloatEnvironmentVariable;
24+
import org.apache.streampipes.commons.environment.variable.IntEnvironmentVariable;
25+
import org.apache.streampipes.commons.environment.variable.LongEnvironmentVariable;
26+
import org.apache.streampipes.commons.environment.variable.StringEnvironmentVariable;
2227

2328
import java.util.List;
2429

@@ -234,5 +239,25 @@ public interface Environment {
234239

235240
BooleanEnvironmentVariable getLoadManagerEnable();
236241

242+
//SpRateLimiter
243+
LongEnvironmentVariable getRateLimiterDefaultWarmupPeriod();
244+
IntEnvironmentVariable getRateLimiterSchedulerInitialDelaySeconds();
245+
IntEnvironmentVariable getRateLimiterSchedulerPeriodSeconds();
246+
IntEnvironmentVariable getRateLimiterStatsResetThreshold();
247+
IntEnvironmentVariable getRateLimiterStatsResetFactor();
248+
IntEnvironmentVariable getRateLimiterStatsResetDivisor();
249+
IntEnvironmentVariable getRateLimiterShutdownTimeoutSeconds();
250+
LongEnvironmentVariable getRateLimiterTimeoutMs();
251+
DoubleEnvironmentVariable getRateLimiterPermitsSetPercentage();
252+
253+
//SpMemoryManager
254+
LongEnvironmentVariable getMemoryManagerDefaultInitialMemory();
255+
LongEnvironmentVariable getMemoryManagerWaitTimeoutMs();
256+
IntEnvironmentVariable getMemorySchedulerInitialDelaySeconds();
257+
IntEnvironmentVariable getMemorySchedulerPeriodSeconds();
258+
LongEnvironmentVariable getMemoryBytesToMb();
259+
IntEnvironmentVariable getMemoryManagerShutdownTimeoutSeconds();
260+
DoubleEnvironmentVariable getMemoryManagerUsageThreshold();
261+
DoubleEnvironmentVariable getMemoryWarningThreshold();
237262
StringEnvironmentVariable getDatalakeSchedulerCron();
238263
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.streampipes.commons.environment.variable;
20+
21+
public class LongEnvironmentVariable extends EnvironmentVariable<Long> {
22+
23+
public LongEnvironmentVariable(org.apache.streampipes.commons.constants.Envs envVariable) {
24+
super(envVariable);
25+
}
26+
27+
@Override
28+
public Long parse(String value) {
29+
return Long.parseLong(value);
30+
}
31+
}

streampipes-commons/src/main/java/org/apache/streampipes/commons/exceptions/SpHttpErrorStatusCode.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
* (the "License"); you may not use this file except in compliance with
77
* the License. You may obtain a copy of the License at
88
*
9-
* http://www.apache.org/licenses/LICENSE-2.0
9+
* http://www.apache.org/licenses/LICENSE-2.0
1010
*
1111
* Unless required by applicable law or agreed to in writing, software
1212
* distributed under the License is distributed on an "AS IS" BASIS,

streampipes-commons/src/main/java/org/apache/streampipes/commons/prometheus/service/ElementServiceStats.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import org.slf4j.Logger;
2121
import org.slf4j.LoggerFactory;
2222

23-
import java.util.Map;
24-
2523
/**
2624
* Service Statistics
2725
*/
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.streampipes.commons.prometheus.spmemorymanager;
20+
21+
import org.apache.streampipes.commons.prometheus.StreamPipesCollectorRegistry;
22+
23+
import io.prometheus.client.Gauge;
24+
25+
/**
26+
* Memory Manager Metrics Manager
27+
*/
28+
public class SpMemoryManagerMetrics {
29+
30+
public static final Gauge MEMORY_USED_BYTES = StreamPipesCollectorRegistry.registerGauge(
31+
"sp_memory_used_bytes",
32+
"Amount of memory used in bytes"
33+
);
34+
35+
public static final Gauge MEMORY_ALLOCATION_RATE = StreamPipesCollectorRegistry.registerGauge(
36+
"sp_memory_allocation_rate_bytes_per_second",
37+
"Memory allocation rate in bytes per second"
38+
);
39+
40+
public static void updateCoreMetrics(double memoryUsedBytes, double allocationRate) {
41+
MEMORY_USED_BYTES.set(memoryUsedBytes);
42+
MEMORY_ALLOCATION_RATE.set(allocationRate);
43+
}
44+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.streampipes.commons.prometheus.spmemorymanager;
20+
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
/**
25+
* Memory Manager Statistics
26+
*/
27+
public class SpMemoryManagerStats {
28+
29+
private static final Logger log = LoggerFactory.getLogger(SpMemoryManagerStats.class);
30+
31+
public double memoryUsedBytes = 0.0;
32+
public double allocationRate = 0.0;
33+
34+
private final SpMemoryManagerMetrics metrics;
35+
36+
public SpMemoryManagerStats() {
37+
this.metrics = new SpMemoryManagerMetrics();
38+
}
39+
40+
/**
41+
* Update all metrics with custom total memory
42+
*/
43+
public void updateAllMetrics() {
44+
SpMemoryManagerMetrics.updateCoreMetrics(memoryUsedBytes, allocationRate);
45+
}
46+
47+
public double getMemoryUsedBytes() {
48+
return memoryUsedBytes;
49+
}
50+
51+
public void setMemoryUsedBytes(double memoryUsedBytes) {
52+
this.memoryUsedBytes = memoryUsedBytes;
53+
}
54+
55+
public double getAllocationRate() {
56+
return allocationRate;
57+
}
58+
59+
public void setAllocationRate(double allocationRate) {
60+
this.allocationRate = allocationRate;
61+
}
62+
63+
public SpMemoryManagerMetrics getMetrics() {
64+
return metrics;
65+
}
66+
}

0 commit comments

Comments
 (0)