Skip to content

Commit

Permalink
Merge pull request #43 from pagopa/IOPLT-385_add_support_for_managed_…
Browse files Browse the repository at this point in the history
…identity_on_evh_internal_client

[#IOPLT-385] Add managed identity support for evh internal client
  • Loading branch information
AleDore authored Mar 22, 2024
2 parents 2137afc + 3371d69 commit 6a1488b
Show file tree
Hide file tree
Showing 6 changed files with 754 additions and 315 deletions.
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
"typescript": "^4.3.5"
},
"dependencies": {
"@azure/identity": "^4.0.1",
"@azure/cosmos": "^4.0.0",
"@azure/event-hubs": "^5.11.3",
"@pagopa/data-indexer-commons": "^0.3.3",
"@azure/data-tables": "^13.2.2",
"@azure/storage-blob": "^12.17.0",
Expand Down
59 changes: 52 additions & 7 deletions src/queue/__test__/service.test.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,34 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import * as KafkaConsumerUtils from "@pagopa/fp-ts-kafkajs/dist/lib/KafkaConsumerCompact";
import { KafkaConsumerCompact } from "@pagopa/fp-ts-kafkajs/dist/lib/KafkaConsumerCompact";
import * as E from "fp-ts/Either";
import * as TE from "fp-ts/TaskEither";
import * as EventHubUtils from "../eventhub/utils";
import { createEventHubService } from "../service";
import { createEventHubService, createNativeEventHubService } from "../service";

jest.mock("../eventhub/utils");
const getEventHubConsumerSpy = jest.spyOn(EventHubUtils, "getEventHubConsumer");
const getEventHubConsumerSpy = jest
.spyOn(EventHubUtils, "getEventHubConsumer")
.mockReturnValue(E.right({} as KafkaConsumerCompact));

const nativeConsumerMock = {
subscribe: jest.fn()
} as any;
const getNativeEventHubConsumerSpy = jest
.spyOn(EventHubUtils, "getNativeEventHubConsumer")
.mockReturnValue(E.right(nativeConsumerMock));
const readMessageSpy = jest.spyOn(KafkaConsumerUtils, "read");
const connectionString = "your_connection_string";
const mockError = new Error("Failed to get event hub consumer");
const mockConsumer = {} as KafkaConsumerCompact;
const mockMessageHandler = jest.fn().mockImplementation(() => TE.of(void 0));
describe("EventHubService", () => {
it("should create EventHubService", async () => {
getEventHubConsumerSpy.mockImplementationOnce(() => E.right(mockConsumer));
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
readMessageSpy.mockImplementationOnce(_ => () => TE.right(void 0));

const result = createEventHubService(connectionString);
const result = createEventHubService({
connectionString,
queueType: "BASIC_KAFKA"
} as any)(mockMessageHandler);

expect(getEventHubConsumerSpy).toHaveBeenCalledWith(connectionString);
expect(result).toEqual(
Expand All @@ -28,9 +39,43 @@ describe("EventHubService", () => {
it("should return an error when getEventHubConsumer fails", async () => {
getEventHubConsumerSpy.mockImplementationOnce(() => E.left(mockError));

const result = createEventHubService(connectionString);
const result = createEventHubService({
connectionString,
queueType: "BASIC_KAFKA"
} as any)(mockMessageHandler);

expect(getEventHubConsumerSpy).toHaveBeenCalledWith(connectionString);
expect(result).toEqual(E.left(mockError));
});
});

describe("NativeEventHubService", () => {
it("should create NativeEventHubService", async () => {
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
readMessageSpy.mockImplementationOnce(_ => () => TE.right(void 0));

const result = createNativeEventHubService({
connectionString,
queueType: "NATIVE_EVH"
})(mockMessageHandler);

expect(getNativeEventHubConsumerSpy).toHaveBeenCalledWith(connectionString);
expect(result).toEqual(
E.right(expect.objectContaining({ consumeMessage: expect.any(Function) }))
);
});

it("should return an error when getNativeEventHubConsumer fails", async () => {
getNativeEventHubConsumerSpy.mockImplementationOnce(() =>
E.left(mockError)
);

const result = createNativeEventHubService({
connectionString,
queueType: "NATIVE_EVH"
} as any)(mockMessageHandler);

expect(getNativeEventHubConsumerSpy).toHaveBeenCalledWith(connectionString);
expect(result).toEqual(E.left(mockError));
});
});
46 changes: 45 additions & 1 deletion src/queue/eventhub/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import {
getConsumerFromSas
} from "@pagopa/fp-ts-kafkajs/dist/lib/KafkaConsumerCompact";
import { AzureEventhubSasFromString } from "@pagopa/fp-ts-kafkajs/dist/lib/KafkaProducerCompact";

import { EventHubConsumerClient } from "@azure/event-hubs";
import { DefaultAzureCredential } from "@azure/identity";
import { defaultLog } from "@pagopa/winston-ts";
import * as E from "fp-ts/Either";
import * as O from "fp-ts/Option";
import { pipe } from "fp-ts/function";

export const getEventHubConsumer = (
Expand All @@ -25,3 +27,45 @@ export const getEventHubConsumer = (
getConsumerFromSas
)
);

export const getNativeEventHubConsumer = (
connectionString: string,
consumerGroup?: string
): E.Either<Error, EventHubConsumerClient> =>
pipe(
AzureEventhubSasFromString.decode(connectionString),
E.map(() =>
pipe(
consumerGroup,
O.fromNullable,
O.getOrElse(() => "consumer-group"),
groupId => new EventHubConsumerClient(groupId, connectionString)
)
),
E.mapLeft(() => new Error(`Error during decoding Event Hub SAS`))
);

export const getPasswordLessNativeEventHubConsumer = (
hostName: string,
topicName: string,
consumerGroup?: string
): E.Either<Error, EventHubConsumerClient> =>
pipe(
new DefaultAzureCredential(),
credentials =>
E.right(
pipe(
consumerGroup,
O.fromNullable,
O.getOrElse(() => "consumer-group"),
groupId =>
new EventHubConsumerClient(
groupId,
hostName,
topicName,
credentials
)
)
),
E.mapLeft(() => new Error(`Error during decoding Event Hub SAS`))
);
127 changes: 108 additions & 19 deletions src/queue/service.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,128 @@
import {
KafkaConsumerCompact,
ReadType,
RunnerConfig,
defaultRunner,
read
} from "@pagopa/fp-ts-kafkajs/dist/lib/KafkaConsumerCompact";
import * as E from "fp-ts/Either";
import * as TE from "fp-ts/TaskEither";
import { pipe } from "fp-ts/function";
import { EachMessageHandler } from "kafkajs";
import { getEventHubConsumer } from "./eventhub/utils";
export type MessageHandler = EachMessageHandler;
export type QueueConsumer = KafkaConsumerCompact;
export interface IQueueService {
readonly consumeMessage: (
topic: string,
runnerConfig: RunnerConfig
) => TE.TaskEither<Error, void>;
}
import * as AR from "fp-ts/Array";
import * as J from "fp-ts/Json";
import { constVoid, flow, pipe } from "fp-ts/function";
import { earliestEventPosition } from "@azure/event-hubs";
import { EventHubConsumerClient } from "@azure/event-hubs";
import { toJsonObject } from "../utils/data";
import {
IQueueService,
KafkaParams,
NativeEvhParams,
PasswordLessNativeEvhParams
} from "../types/evh";
import {
getEventHubConsumer,
getNativeEventHubConsumer,
getPasswordLessNativeEventHubConsumer
} from "./eventhub/utils";

const defaultConsumeMessage = (consumer: KafkaConsumerCompact) => (
topic: string,
runnerConfig: RunnerConfig
): TE.TaskEither<Error, void> =>
read(consumer)({ topics: [topic] }, defaultRunner, runnerConfig);

export const eventHubService = {
consumeMessage: read
};

export const createEventHubService = (
connectionString: string
export const createEventHubService = (params: KafkaParams) => (
messageHandler: (
message: Record<string, unknown>
) => TE.TaskEither<Error, void>
): E.Either<Error, IQueueService> =>
pipe(
getEventHubConsumer(connectionString),
getEventHubConsumer(params.connectionString),
E.map(consumer => ({
consumeMessage: defaultConsumeMessage(consumer)
consumeMessage: defaultConsumeMessage(consumer)(params.topicName, {
...params.runnerConfigOptions,
handler: eachBatchPayload =>
pipe(
eachBatchPayload.batch.messages,
AR.map(
flow(
msg => msg.value,
buf => buf.toString(),
J.parse,
E.mapLeft(err =>
Error(`Cannot decode Kafka Message|ERROR=${String(err)}`)
),
E.map(toJsonObject),
TE.fromEither,
TE.chain(messageHandler)
)
),
AR.sequence(TE.ApplicativeSeq),
TE.mapLeft(err => {
throw err;
}),
TE.map(constVoid),
TE.toUnion
)(),
readType: ReadType.Message
})
}))
);

const getEvhSubscriber = (
messageHandler: (
message: Record<string, unknown>
) => TE.TaskEither<Error, void>
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
) => (consumer: EventHubConsumerClient): TE.TaskEither<Error, void> =>
pipe(
consumer.subscribe(
{
processError: async (_err, _context) => {
// error reporting/handling code here
},
processEvents: async (events, _) =>
pipe(
events,
AR.map(msgEvt => messageHandler(msgEvt.body)),
AR.sequence(TE.ApplicativeSeq),
TE.mapLeft(err => {
throw err;
}),
TE.map(constVoid),
TE.toUnion
)()
},
{ maxBatchSize: 1, startPosition: earliestEventPosition }
),
TE.of,
TE.map(constVoid)
);

export const createNativeEventHubService = (params: NativeEvhParams) => (
messageHandler: (
message: Record<string, unknown>
) => TE.TaskEither<Error, void>
): E.Either<Error, IQueueService> =>
pipe(
getNativeEventHubConsumer(params.connectionString),
E.map(getEvhSubscriber(messageHandler)),
E.map(consumeMessage => ({
consumeMessage
}))
);

export const createPasswordlessNativeEventHubService = (
params: PasswordLessNativeEvhParams
) => (
messageHandler: (
message: Record<string, unknown>
) => TE.TaskEither<Error, void>
): E.Either<Error, IQueueService> =>
pipe(
getPasswordLessNativeEventHubConsumer(params.hostName, params.topicName),
E.map(getEvhSubscriber(messageHandler)),
E.map(consumeMessage => ({
consumeMessage
}))
);
59 changes: 59 additions & 0 deletions src/types/evh.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { KafkaConsumerCompact } from "@pagopa/fp-ts-kafkajs/dist/lib/KafkaConsumerCompact";
import { EachMessageHandler } from "kafkajs";
import * as TE from "fp-ts/TaskEither";
import * as t from "io-ts";

const BaseQueueParams = t.type({
connectionString: t.string
});
export type BaseQueueParams = t.TypeOf<typeof BaseQueueParams>;

export const RunnerConfigOptions = t.type({
autoCommit: t.boolean,
autoCommitInterval: t.number,
autoCommitThreshold: t.number,
eachBatchAutoResolve: t.boolean,
partitionsConsumedConcurrently: t.number
});
export type RunnerConfigOptions = t.TypeOf<typeof RunnerConfigOptions>;

export const KafkaParams = t.intersection([
BaseQueueParams,
t.type({
queueType: t.literal("BASIC_KAFKA"),
runnerConfigOptions: RunnerConfigOptions,
topicName: t.string
})
]);
export type KafkaParams = t.TypeOf<typeof KafkaParams>;

export const NativeEvhParams = t.intersection([
BaseQueueParams,
t.type({
queueType: t.literal("NATIVE_EVH")
})
]);
export type NativeEvhParams = t.TypeOf<typeof NativeEvhParams>;

export const PasswordLessNativeEvhParams = t.type({
hostName: t.string,
queueType: t.literal("PASSWORDLESS_EVH"),
topicName: t.string
});
export type PasswordLessNativeEvhParams = t.TypeOf<
typeof PasswordLessNativeEvhParams
>;

export const QueueParams = t.union([
KafkaParams,
NativeEvhParams,
PasswordLessNativeEvhParams
]);
export type QueueParams = t.TypeOf<typeof QueueParams>;

export type MessageHandler = EachMessageHandler;
export type QueueConsumer = KafkaConsumerCompact;

export interface IQueueService {
readonly consumeMessage: TE.TaskEither<Error, void>;
}
Loading

0 comments on commit 6a1488b

Please sign in to comment.