Skip to content

Commit

Permalink
Merge pull request #9 from Rohland/avoid-sumo-429
Browse files Browse the repository at this point in the history
Add rate limiting to Sumo Logic calls
  • Loading branch information
Rohland authored Apr 15, 2024
2 parents 408dc5a + 0917495 commit d08487f
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 10 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "barky",
"version": "1.1.9",
"version": "1.1.11",
"description": "A simple cloud services watchdog with digest notification support & no external dependencies",
"homepage": "https://github.com/Rohland/barky#readme",
"main": "dist/cli.js",
Expand Down
84 changes: 84 additions & 0 deletions src/evaluators/sumo.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import { executeSumoRequest } from "./sumo";

describe('sumo ', () => {
describe('executeSumoRequest', () => {
describe("when executed with success", () => {
it("should return result", async () => {
// arrange
const request = jest.fn().mockResolvedValue("result");

// act
const result = await executeSumoRequest("test", request);

// assert
expect(result).toEqual("result");
expect(request).toHaveBeenCalledTimes(1);
});
});
describe("when failure", () => {
it("should throw error", async () => {
// arrange
const request = jest.fn().mockRejectedValue(new Error("error"));

// act
let error;
try {
await executeSumoRequest("test", request);
} catch (err) {
error = err;
}

// assert
expect(error).toEqual(new Error("error"));
expect(request).toHaveBeenCalledTimes(1);
});
});
describe("when request takes some time", () => {
it("should wait", async () => {
// arrange
const request = jest.fn().mockImplementation(() => new Promise(resolve => setTimeout(() => resolve("result"), 100)));

// act
const start = performance.now();
const result = await executeSumoRequest("test", request);
const end = performance.now();

// assert
expect(result).toEqual("result");
expect(end - start).toBeGreaterThanOrEqual(95);
expect(request).toHaveBeenCalledTimes(1);
});
});
describe("when multiple requests sent", () => {
it("should queue them and only execute 5 per second", async () => {
// arrange
const count = 20;
const requests = [];
const countPerSecond = new Map<number, number>();
for (let i = 0; i < count; i++) {
const req = jest.fn().mockImplementation(() => new Promise(resolve => {
const time = Math.round(performance.now() / 1000);
const count = countPerSecond.get(time) ?? 0;
countPerSecond.set(time, count + 1);
setTimeout(() => resolve(`result${ i }`), 100)
}));
requests.push(req);
}

// act
const start = performance.now();
const result = await Promise.all(requests.map(x => executeSumoRequest("test", x)));
const end = performance.now();

// assert
expect(result).toEqual(requests.map((_x,i) => `result${ i }`));
expect(end - start).toBeGreaterThanOrEqual(2000);
expect(end - start).toBeLessThanOrEqual(4000);
requests.forEach(x => expect(x).toHaveBeenCalledTimes(1));
countPerSecond.forEach((value, _) => {
expect(value).toBeLessThanOrEqual(5);
});
});
});
});
});
38 changes: 29 additions & 9 deletions src/evaluators/sumo.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import axios from "axios";
import axios, { AxiosRequestConfig } from "axios";
import { parsePeriodRange } from "../lib/period-parser";
import { sleepMs } from "../lib/sleep";
import { MonitorFailureResult, Result, SumoResult } from "../models/result";
Expand All @@ -8,6 +8,7 @@ import { log } from "../models/logger";
import { IApp } from "../models/app";
import { BaseEvaluator, EvaluatorType, findTriggerRulesFor, generateValueForVariable } from "./base";
import { IUniqueKey } from "../lib/key";
import { RateLimiter } from "../lib/rate-limiter";

const SumoDomain = process.env["sumo-domain"] ?? "api.eu.sumologic.com";
const SumoUrl = `https://${ SumoDomain }/api/v1/search/jobs`;
Expand All @@ -17,6 +18,10 @@ const JobPollMillis = 1000;
// that may lead to rate limits being reached
const JobInitialPollMillis = 3000;

// sumo logic has strict concurrency rules, limited to 10 per key - let's be cautious
const MaxSumoConcurrency = 3;
const MaxSumoRequestsPerSecond = 5;

export class SumoEvaluator extends BaseEvaluator {
constructor(config: any) {
super(config);
Expand Down Expand Up @@ -145,31 +150,34 @@ async function startSearch(app, log) {
timeZone: "UTC",
autoParsingMode: "intelligent"
};
const result = await axios.post(
const result = await executeSumoRequest(app.token, () => axios.post(
SumoUrl,
search,
getHeaders(app.token));
getRequestConfig(app.token))
);
log(`started sumo job search for '${ app.name }'`, result.data);
return result.data.id;
}

async function isJobComplete(app, log) {
const startTime = +new Date();
let pollCount = 0;
await sleepMs(JobInitialPollMillis);
while (+new Date() - startTime < app.timeout) {
try {
const status = await axios.get(`${ SumoUrl }/${ app.jobId }`, getHeaders(app.token));
const status = await executeSumoRequest(app.token, () => axios.get(`${ SumoUrl }/${ app.jobId }`, getRequestConfig(app.token)));
if (status.data.state.match(/done gathering results/i)) {
return status.data;
}
await sleepMs(pollCount === 0 ? JobInitialPollMillis : JobPollMillis);
await sleepMs(JobPollMillis);
pollCount++;
} catch (err) {
await deleteJob(app, log);
throw err;
}
}
const timedOutAfter = +new Date() - startTime;
await sleepMs(JobPollMillis);
// job failed
try {
await deleteJob(app, log);
Expand All @@ -180,7 +188,7 @@ async function isJobComplete(app, log) {

async function getSearchResult(app, log) {
try {
const result = await axios.get(`${ SumoUrl }/${ app.jobId }/records?offset=0&limit=100`, getHeaders(app.token));
const result = await executeSumoRequest(app.token, () => axios.get(`${ SumoUrl }/${ app.jobId }/records?offset=0&limit=100`, getRequestConfig(app.token)));
log(`successfully completed sumo job search for '${ app.name }', result:`, result.data);
return result.data;
} catch(err) {
Expand All @@ -191,18 +199,19 @@ async function getSearchResult(app, log) {

async function deleteJob(app, log) {
try {
await axios.delete(`${ SumoUrl }/${ app.jobId }`, getHeaders(app.token));
await executeSumoRequest(app.token, () => axios.delete(`${ SumoUrl }/${ app.jobId }`, getRequestConfig(app.token)));
} catch (error) {
log("error: could not delete job", { app, error });
// no-op
}
}

function getHeaders(tokenName) {
function getRequestConfig(tokenName): AxiosRequestConfig {
if (!process.env[tokenName]) {
throw new Error(`missing sumo logic env var with name '${ tokenName }'`);
}
return {
timeout: 10000,
headers: {
"Content-Type": "application/json",
Accept: "application/json",
Expand All @@ -211,6 +220,17 @@ function getHeaders(tokenName) {
};
}

function toSumoTime(date) {
function toSumoTime(date: Date): string {
return date.toISOString().split(".")[0];
}

const rateLimiters = new Map<string, RateLimiter>();

export async function executeSumoRequest<T>(
key: string,
request: () => Promise<T>): Promise<T> {
let limiter = rateLimiters.get(key) ?? new RateLimiter(MaxSumoRequestsPerSecond, MaxSumoConcurrency);
rateLimiters.set(key, limiter);
return await limiter.execute(request);
}

115 changes: 115 additions & 0 deletions src/lib/rate-limiter.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { RateLimiter } from "./rate-limiter";

describe('rate-limiter', () => {
describe('execute', () => {
describe("when executed with success", () => {
it("should return result", async () => {
// arrange
const request = jest.fn().mockResolvedValue("result");
const sut = getSut();;

// act
const result = await sut.execute(request);

// assert
expect(result).toEqual("result");
expect(request).toHaveBeenCalledTimes(1);
});
});
describe("when failure", () => {
it("should throw error", async () => {
// arrange
const request = jest.fn().mockRejectedValue(new Error("error"));
const sut = getSut();;

// act
let error;
try {
await sut.execute(request);
} catch (err) {
error = err;
}

// assert
expect(error).toEqual(new Error("error"));
expect(request).toHaveBeenCalledTimes(1);
});
});
describe("when request takes some time", () => {
it("should wait", async () => {
// arrange
const sut = getSut();;
const request = jest.fn().mockImplementation(() => new Promise(resolve => setTimeout(() => resolve("result"), 100)));

// act
const start = performance.now();
const result = await sut.execute(request);
const end = performance.now();

// assert
expect(result).toEqual("result");
expect(end - start).toBeGreaterThanOrEqual(95);
expect(request).toHaveBeenCalledTimes(1);
});
});
describe("when multiple requests sent", () => {
it("should queue them only execute x in parallel", async () => {
const maxPerSec = 100;
const maxConcurrent = 1;
const sut = getSut(maxPerSec, maxConcurrent);
const count = 3;
const requests = [];
for (let i = 0; i < count; i++) {
const req = jest.fn().mockImplementation(() => new Promise(resolve => {
setTimeout(() => resolve(`result${ i }`), 500)
}));
requests.push(req);
}

// act
const start = performance.now();
const result = await Promise.all(requests.map(x => sut.execute(x)));
const end = performance.now();

// assert
expect(result).toEqual(requests.map((_x,i) => `result${ i }`));
expect(end - start).toBeGreaterThanOrEqual(1500);
requests.forEach(x => expect(x).toHaveBeenCalledTimes(1));
});
it("should queue them and only execute x per second", async () => {
// arrange
const maxPerSec = 5;
const sut = getSut(maxPerSec);
const count = 20;
const requests = [];
const countPerSecond = new Map<number, number>();
for (let i = 0; i < count; i++) {
const req = jest.fn().mockImplementation(() => new Promise(resolve => {
const time = Math.round(performance.now() / 1000);
const count = countPerSecond.get(time) ?? 0;
countPerSecond.set(time, count + 1);
setTimeout(() => resolve(`result${ i }`), 100)
}));
requests.push(req);
}

// act
const start = performance.now();
const result = await Promise.all(requests.map(x => sut.execute(x)));
const end = performance.now();

// assert
expect(result).toEqual(requests.map((_x,i) => `result${ i }`));
expect(end - start).toBeGreaterThanOrEqual(2000);
expect(end - start).toBeLessThanOrEqual(4000);
requests.forEach(x => expect(x).toHaveBeenCalledTimes(1));
countPerSecond.forEach((value, _) => {
expect(value).toBeLessThanOrEqual(maxPerSec);
});
});
});
});
function getSut(perSecond = null, concurrent = null) {
return new RateLimiter(perSecond ?? 5, concurrent ?? 3);
}
});
Loading

0 comments on commit d08487f

Please sign in to comment.