Skip to content

Commit

Permalink
[TH2-5007] MessageSubscriber supports single listener (#275)
Browse files Browse the repository at this point in the history
* The unsubscribe method of SubscribtionMonitor interface cancels RabbitMQ subscribtion
  • Loading branch information
Nikita-Smirnov-Exactpro authored Sep 6, 2023
1 parent 5bcdf62 commit c729897
Show file tree
Hide file tree
Showing 24 changed files with 725 additions and 274 deletions.
45 changes: 44 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# th2 common library (Java) (5.4.0)
# th2 common library (Java) (5.4.1)

## Usage

Expand Down Expand Up @@ -367,6 +367,45 @@ It means that the users can execute calls from the console or through scripts
via [grpcurl](https://github.com/fullstorydev/grpcurl#grpcurl) without gRPC schema (files with proto extensions
describes gRPC service structure)

## MQ router

This kind of router provides the ability for component to send / receive messages via RabbitMQ.
Router has several methods to subscribe and publish RabbitMQ messages steam (th2 use batches of messages or events as transport).

#### Choice pin by attributes

Pin attributes are key mechanism to choose pin for action execution. Router search all pins which have full set of passed attributes.
For example, the pins: `first` [`publish`, `raw`, `custom_a` ], `second` [`publish`, `raw`, `custom_b` ].
* only the `first` pin will be chosen by attribut sets: [`custom_a`], [`custom_a`, `raw`], [`custom_a`, `publish`], [`publish`, `raw`, `custom_a` ]
* both pins will be chosen by attribut sets: [`raw`], [`publish`], [`publish`, `raw` ]

Router implementation and methods have predefined attributes. Result set of attributes for searching pin is union of <router>, <method>, <passed> attributes.
Predefined attributes:
* `RabbitMessageGroupBatchRouter` hasn't got any predefined attributes
* `EventBatchRouter` has `evnet` attribute
* `TransportGroupBatchRouter` has `transport-group` attribute

* `send*` exclude `sendExclusive` methods have `publish` attribute
* `subscribe*` excluded `subscribeExclusive` methods have `subscribe` attribute

#### Choice publish pin

Router chooses pins in two stages. At first select all pins matched by attributes than check passed message (batch) by
pin's filters and then send the whole message or its part via pins leaved after two steps.

#### Choice subscribe pin

Router chooses pins only by attributes. Pin's filters are used when message has been delivered and parsed. Registered lister doesn't receive message, or it parts failure check by pin's filter.

### Restrictions:

Some methods have `All` suffix, it means that developer can publish or subscribe message via 1 or several pins otherwise via only 1 pin.
If number of passed check pins are different then required range, method throw an exception.

Developer can register only one listener for each pin but one listener can handle messages from several pins.

`TransportGroupBatchRouter` router doesn't split incoming or outgoing batch by filter not unlike `RabbitMessageGroupBatchRouter` router

## Export common metrics to Prometheus

It can be performed by the following utility methods in CommonMetrics class
Expand Down Expand Up @@ -455,6 +494,10 @@ dependencies {

## Release notes

### 5.4.1-dev
#### Fix
+ `SubscriberMonitor` is returned from `MessageRouter.subscribe` methods is proxy object to manage RabbitMQ subscribtion without internal listener

### 5.4.0-dev
#### Updated
+ bom: `4.4.0-dev` to `4.5.0-dev`
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
release_version=5.4.0
release_version=5.4.1
description='th2 common library (Java)'
vcs_url=https://github.com/th2-net/th2-common-j
kapt.include.compile.classpath=false
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -17,6 +17,7 @@

import java.util.Set;

import com.exactpro.th2.common.schema.message.ConfirmationListener;
import org.apache.commons.collections4.SetUtils;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -70,11 +71,16 @@ protected MessageSender<EventBatch> createSender(QueueConfiguration queueConfigu

@NotNull
@Override
protected MessageSubscriber<EventBatch> createSubscriber(QueueConfiguration queueConfiguration, @NotNull String pinName) {
protected MessageSubscriber createSubscriber(
QueueConfiguration queueConfiguration,
@NotNull String pinName,
@NotNull ConfirmationListener<EventBatch> listener
) {
return new EventBatchSubscriber(
getConnectionManager(),
queueConfiguration.getQueue(),
pinName
pinName,
listener
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,8 +17,8 @@
package com.exactpro.th2.common.schema.event;

import com.exactpro.th2.common.grpc.EventBatch;
import com.exactpro.th2.common.schema.message.ConfirmationListener;
import com.exactpro.th2.common.schema.message.DeliveryMetadata;
import com.exactpro.th2.common.schema.message.FilterFunction;
import com.exactpro.th2.common.schema.message.ManualAckDeliveryCallback.Confirmation;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.AbstractRabbitSubscriber;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;
Expand All @@ -43,9 +43,10 @@ public class EventBatchSubscriber extends AbstractRabbitSubscriber<EventBatch> {
public EventBatchSubscriber(
@NotNull ConnectionManager connectionManager,
@NotNull String queue,
@NotNull String th2Pin
@NotNull String th2Pin,
@NotNull ConfirmationListener<EventBatch> listener
) {
super(connectionManager, queue, th2Pin, EVENT_TYPE);
super(connectionManager, queue, th2Pin, EVENT_TYPE, listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
/**
* Abstract implementation for {@link GrpcRouter}
* <p>
* Implement {@link GrpcRouter#init(GrpcRouterConfiguration)}
* <p>
* Implement {@link GrpcRouter#init(GrpcConfiguration, GrpcRouterConfiguration)}
* <p>
* Implement {@link GrpcRouter#startServer(BindableService...)}
Expand Down Expand Up @@ -122,11 +120,6 @@ public abstract class AbstractGrpcRouter implements GrpcRouter {

protected static final Map<MethodDetails, Counter.Child> GRPC_RECEIVE_CALL_RESPONSE_SIZE_MAP = new ConcurrentHashMap<>();

@Override
public void init(GrpcRouterConfiguration configuration) {
init(new GrpcConfiguration(), configuration);
}

@Override
public void init(@NotNull GrpcConfiguration configuration, @NotNull GrpcRouterConfiguration routerConfiguration) {
failIfInitialized();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -26,11 +26,6 @@
* @see AbstractGrpcRouter
*/
public interface GrpcRouter extends AutoCloseable {
/**
* Initialization router
*/
@Deprecated(since = "3.9.0", forRemoval = true)
void init(GrpcRouterConfiguration configuration);

void init(@NotNull GrpcConfiguration configuration, @NotNull GrpcRouterConfiguration routerConfiguration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,36 @@ default void init(@NotNull MessageRouterContext context, @NotNull MessageRouter<
ExclusiveSubscriberMonitor subscribeExclusive(MessageListener<T> callback);

/**
* Listen <b>ONE</b> RabbitMQ queue by intersection schemas queues attributes
* Listen <b>ONE</b> RabbitMQ queue by intersection schemas queues attributes.
* Restrictions:
* You can create only one subscription to th2 pin using any subscribe* functions.
* Internal state:
* Router uses external Connection Manage to interact with RabbitMQ, which holds one connection and one channel per th2 pin in general.
* This rule exception is re-connect to RabbitMQ when the manager establishes new connection and creates new channels.
* @param callback listener
* @param queueAttr queues attributes
* @throws IllegalStateException when more than 1 queue is found
* @return {@link SubscriberMonitor} it start listening. Returns null if can not listen to this queue
* @throws RuntimeException when the th2 pin is matched by passed attributes already has active subscription
* @return {@link SubscriberMonitor} it start listening.
*/
SubscriberMonitor subscribe(MessageListener<T> callback, String... queueAttr);

/**
* Listen <b>SOME</b> RabbitMQ queues by intersection schemas queues attributes
* @see #subscribe(MessageListener, String...)
* @param callback listener
* @param queueAttr queues attributes
* @return {@link SubscriberMonitor} it start listening. Returns null if can not listen to this queue
* @return {@link SubscriberMonitor} it start listening.
*/
SubscriberMonitor subscribeAll(MessageListener<T> callback, String... queueAttr);

/**
* Listen <b>ONE</b> RabbitMQ queue by intersection schemas queues attributes
* @see #subscribe(MessageListener, String...)
* @param queueAttr queues attributes
* @param callback listener with manual confirmation
* @throws IllegalStateException when more than 1 queue is found
* @return {@link SubscriberMonitor} it start listening. Returns null if can not listen to this queue
* @return {@link SubscriberMonitor} it start listening.
*/
default SubscriberMonitor subscribeWithManualAck(ManualConfirmationListener<T> callback, String... queueAttr) {
// TODO: probably should not have default implementation
Expand All @@ -91,9 +99,10 @@ default SubscriberMonitor subscribeWithManualAck(ManualConfirmationListener<T> c

/**
* Listen <b>SOME</b> RabbitMQ queues by intersection schemas queues attributes
* @see #subscribe(MessageListener, String...)
* @param callback listener with manual confirmation
* @param queueAttr queues attributes
* @return {@link SubscriberMonitor} it start listening. Returns null if can not listen to this queue
* @return {@link SubscriberMonitor} it start listening.
*/
default SubscriberMonitor subscribeAllWithManualAck(ManualConfirmationListener<T> callback, String... queueAttr) {
// TODO: probably should not have default implementation
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2022 Exactpro (Exactpro Systems Limited)
* Copyright 2020-2023 Exactpro (Exactpro Systems Limited)
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
Expand All @@ -15,34 +15,10 @@

package com.exactpro.th2.common.schema.message;

import com.exactpro.th2.common.schema.message.impl.rabbitmq.configuration.SubscribeTarget;
import com.exactpro.th2.common.schema.message.impl.rabbitmq.connection.ConnectionManager;

import org.jetbrains.annotations.NotNull;

import javax.annotation.concurrent.ThreadSafe;

/**
* Listen messages and transmit it to {@link MessageListener}
*/
@ThreadSafe
public interface MessageSubscriber<T> extends AutoCloseable {
// Please use constructor for initialization
@Deprecated(since = "3.3.0", forRemoval = true)
void init(@NotNull ConnectionManager connectionManager, @NotNull String exchangeName, @NotNull SubscribeTarget subscribeTargets);

// Please use constructor for initialization
@Deprecated
void init(@NotNull ConnectionManager connectionManager, @NotNull SubscribeTarget subscribeTarget, @NotNull FilterFunction filterFunc);

/**
* @deprecated subscriber automatically subscribe after adding first listener and
* unsubscribe after remove the last one
*/
@Deprecated
void start() throws Exception;

void addListener(ConfirmationListener<T> messageListener);

void removeListener(ConfirmationListener<T> messageListener);
}
public interface MessageSubscriber extends AutoCloseable { }
Loading

0 comments on commit c729897

Please sign in to comment.