Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate limiting to Sumo Logic calls #9

Merged
merged 6 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "barky",
"version": "1.1.9",
"version": "1.1.11",
"description": "A simple cloud services watchdog with digest notification support & no external dependencies",
"homepage": "https://github.com/Rohland/barky#readme",
"main": "dist/cli.js",
Expand Down
84 changes: 84 additions & 0 deletions src/evaluators/sumo.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { executeSumoRequest } from "./sumo";

describe('sumo ', () => {
describe('executeSumoRequest', () => {
describe("when executed with success", () => {
it("should return result", async () => {
// arrange
const request = jest.fn().mockResolvedValue("result");

// act
const result = await executeSumoRequest("test", request);

// assert
expect(result).toEqual("result");
expect(request).toHaveBeenCalledTimes(1);
});
});
describe("when failure", () => {
it("should throw error", async () => {
// arrange
const request = jest.fn().mockRejectedValue(new Error("error"));

// act
let error;
try {
await executeSumoRequest("test", request);
} catch (err) {
error = err;
}

// assert
expect(error).toEqual(new Error("error"));
expect(request).toHaveBeenCalledTimes(1);
});
});
describe("when request takes some time", () => {
it("should wait", async () => {
// arrange
const request = jest.fn().mockImplementation(() => new Promise(resolve => setTimeout(() => resolve("result"), 100)));

// act
const start = performance.now();
const result = await executeSumoRequest("test", request);
const end = performance.now();

// assert
expect(result).toEqual("result");
expect(end - start).toBeGreaterThanOrEqual(95);
expect(request).toHaveBeenCalledTimes(1);
});
});
describe("when multiple requests sent", () => {
it("should queue them and only execute 5 per second", async () => {
// arrange
const count = 20;
const requests = [];
const countPerSecond = new Map<number, number>();
for (let i = 0; i < count; i++) {
const req = jest.fn().mockImplementation(() => new Promise(resolve => {
const time = Math.round(performance.now() / 1000);
const count = countPerSecond.get(time) ?? 0;
countPerSecond.set(time, count + 1);
setTimeout(() => resolve(`result${ i }`), 100)
}));
requests.push(req);
}

// act
const start = performance.now();
const result = await Promise.all(requests.map(x => executeSumoRequest("test", x)));
const end = performance.now();

// assert
expect(result).toEqual(requests.map((_x,i) => `result${ i }`));
expect(end - start).toBeGreaterThanOrEqual(2000);
expect(end - start).toBeLessThanOrEqual(4000);
requests.forEach(x => expect(x).toHaveBeenCalledTimes(1));
countPerSecond.forEach((value, _) => {
expect(value).toBeLessThanOrEqual(5);
});
});
});
});
});
38 changes: 29 additions & 9 deletions src/evaluators/sumo.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import axios from "axios";
import axios, { AxiosRequestConfig } from "axios";
import { parsePeriodRange } from "../lib/period-parser";
import { sleepMs } from "../lib/sleep";
import { MonitorFailureResult, Result, SumoResult } from "../models/result";
Expand All @@ -8,6 +8,7 @@ import { log } from "../models/logger";
import { IApp } from "../models/app";
import { BaseEvaluator, EvaluatorType, findTriggerRulesFor, generateValueForVariable } from "./base";
import { IUniqueKey } from "../lib/key";
import { RateLimiter } from "../lib/rate-limiter";

const SumoDomain = process.env["sumo-domain"] ?? "api.eu.sumologic.com";
const SumoUrl = `https://${ SumoDomain }/api/v1/search/jobs`;
Expand All @@ -17,6 +18,10 @@ const JobPollMillis = 1000;
// that may lead to rate limits being reached
const JobInitialPollMillis = 3000;

// sumo logic has strict concurrency rules, limited to 10 per key - let's be cautious
const MaxSumoConcurrency = 3;
const MaxSumoRequestsPerSecond = 5;

export class SumoEvaluator extends BaseEvaluator {
constructor(config: any) {
super(config);
Expand Down Expand Up @@ -145,31 +150,34 @@ async function startSearch(app, log) {
timeZone: "UTC",
autoParsingMode: "intelligent"
};
const result = await axios.post(
const result = await executeSumoRequest(app.token, () => axios.post(
SumoUrl,
search,
getHeaders(app.token));
getRequestConfig(app.token))
);
log(`started sumo job search for '${ app.name }'`, result.data);
return result.data.id;
}

async function isJobComplete(app, log) {
const startTime = +new Date();
let pollCount = 0;
await sleepMs(JobInitialPollMillis);
while (+new Date() - startTime < app.timeout) {
try {
const status = await axios.get(`${ SumoUrl }/${ app.jobId }`, getHeaders(app.token));
const status = await executeSumoRequest(app.token, () => axios.get(`${ SumoUrl }/${ app.jobId }`, getRequestConfig(app.token)));
if (status.data.state.match(/done gathering results/i)) {
return status.data;
}
await sleepMs(pollCount === 0 ? JobInitialPollMillis : JobPollMillis);
await sleepMs(JobPollMillis);
pollCount++;
} catch (err) {
await deleteJob(app, log);
throw err;
}
}
const timedOutAfter = +new Date() - startTime;
await sleepMs(JobPollMillis);
// job failed
try {
await deleteJob(app, log);
Expand All @@ -180,7 +188,7 @@ async function isJobComplete(app, log) {

async function getSearchResult(app, log) {
try {
const result = await axios.get(`${ SumoUrl }/${ app.jobId }/records?offset=0&limit=100`, getHeaders(app.token));
const result = await executeSumoRequest(app.token, () => axios.get(`${ SumoUrl }/${ app.jobId }/records?offset=0&limit=100`, getRequestConfig(app.token)));
log(`successfully completed sumo job search for '${ app.name }', result:`, result.data);
return result.data;
} catch(err) {
Expand All @@ -191,18 +199,19 @@ async function getSearchResult(app, log) {

async function deleteJob(app, log) {
try {
await axios.delete(`${ SumoUrl }/${ app.jobId }`, getHeaders(app.token));
await executeSumoRequest(app.token, () => axios.delete(`${ SumoUrl }/${ app.jobId }`, getRequestConfig(app.token)));
} catch (error) {
log("error: could not delete job", { app, error });
// no-op
}
}

function getHeaders(tokenName) {
function getRequestConfig(tokenName): AxiosRequestConfig {
if (!process.env[tokenName]) {
throw new Error(`missing sumo logic env var with name '${ tokenName }'`);
}
return {
timeout: 10000,
headers: {
"Content-Type": "application/json",
Accept: "application/json",
Expand All @@ -211,6 +220,17 @@ function getHeaders(tokenName) {
};
}

function toSumoTime(date) {
function toSumoTime(date: Date): string {
return date.toISOString().split(".")[0];
}

const rateLimiters = new Map<string, RateLimiter>();

export async function executeSumoRequest<T>(
key: string,
request: () => Promise<T>): Promise<T> {
let limiter = rateLimiters.get(key) ?? new RateLimiter(MaxSumoRequestsPerSecond, MaxSumoConcurrency);
rateLimiters.set(key, limiter);
return await limiter.execute(request);
}

115 changes: 115 additions & 0 deletions src/lib/rate-limiter.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { RateLimiter } from "./rate-limiter";

describe('rate-limiter', () => {
describe('execute', () => {
describe("when executed with success", () => {
it("should return result", async () => {
// arrange
const request = jest.fn().mockResolvedValue("result");
const sut = getSut();;

// act
const result = await sut.execute(request);

// assert
expect(result).toEqual("result");
expect(request).toHaveBeenCalledTimes(1);
});
});
describe("when failure", () => {
it("should throw error", async () => {
// arrange
const request = jest.fn().mockRejectedValue(new Error("error"));
const sut = getSut();;

// act
let error;
try {
await sut.execute(request);
} catch (err) {
error = err;
}

// assert
expect(error).toEqual(new Error("error"));
expect(request).toHaveBeenCalledTimes(1);
});
});
describe("when request takes some time", () => {
it("should wait", async () => {
// arrange
const sut = getSut();;
const request = jest.fn().mockImplementation(() => new Promise(resolve => setTimeout(() => resolve("result"), 100)));

// act
const start = performance.now();
const result = await sut.execute(request);
const end = performance.now();

// assert
expect(result).toEqual("result");
expect(end - start).toBeGreaterThanOrEqual(95);
expect(request).toHaveBeenCalledTimes(1);
});
});
describe("when multiple requests sent", () => {
it("should queue them only execute x in parallel", async () => {
const maxPerSec = 100;
const maxConcurrent = 1;
const sut = getSut(maxPerSec, maxConcurrent);
const count = 3;
const requests = [];
for (let i = 0; i < count; i++) {
const req = jest.fn().mockImplementation(() => new Promise(resolve => {
setTimeout(() => resolve(`result${ i }`), 500)
}));
requests.push(req);
}

// act
const start = performance.now();
const result = await Promise.all(requests.map(x => sut.execute(x)));
const end = performance.now();

// assert
expect(result).toEqual(requests.map((_x,i) => `result${ i }`));
expect(end - start).toBeGreaterThanOrEqual(1500);
requests.forEach(x => expect(x).toHaveBeenCalledTimes(1));
});
it("should queue them and only execute x per second", async () => {
// arrange
const maxPerSec = 5;
const sut = getSut(maxPerSec);
const count = 20;
const requests = [];
const countPerSecond = new Map<number, number>();
for (let i = 0; i < count; i++) {
const req = jest.fn().mockImplementation(() => new Promise(resolve => {
const time = Math.round(performance.now() / 1000);
const count = countPerSecond.get(time) ?? 0;
countPerSecond.set(time, count + 1);
setTimeout(() => resolve(`result${ i }`), 100)
}));
requests.push(req);
}

// act
const start = performance.now();
const result = await Promise.all(requests.map(x => sut.execute(x)));
const end = performance.now();

// assert
expect(result).toEqual(requests.map((_x,i) => `result${ i }`));
expect(end - start).toBeGreaterThanOrEqual(2000);
expect(end - start).toBeLessThanOrEqual(4000);
requests.forEach(x => expect(x).toHaveBeenCalledTimes(1));
countPerSecond.forEach((value, _) => {
expect(value).toBeLessThanOrEqual(maxPerSec);
});
});
});
});
function getSut(perSecond = null, concurrent = null) {
return new RateLimiter(perSecond ?? 5, concurrent ?? 3);
}
});
Loading
Loading