Skip to content

Commit

Permalink
Introduce native support to pass MQPutMessageOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
ayeshLK committed Sep 4, 2024
1 parent 999e015 commit 5151afb
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
8 changes: 6 additions & 2 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQQueue;
import com.ibm.mq.constants.CMQC;
import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions;
Expand All @@ -44,13 +45,16 @@ public class Queue {
private static final ExecutorService QUEUE_EXECUTOR_SERVICE = Executors.newCachedThreadPool(
new MQThreadFactory("balx-ibmmq-queue-client-network-thread"));

public static Object put(Environment environment, BObject queueObject, BMap<BString, Object> message) {
public static Object put(Environment environment, BObject queueObject, BMap<BString, Object> message,
long options) {
MQQueue queue = (MQQueue) queueObject.getNativeData(Constants.NATIVE_QUEUE);
MQMessage mqMessage = CommonUtils.getMqMessageFromBMessage(message);
Future future = environment.markAsync();
QUEUE_EXECUTOR_SERVICE.execute(() -> {
try {
queue.put(mqMessage);
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = (int) options;
queue.put(mqMessage, pmo);
future.complete(null);
} catch (MQException e) {
BError bError = createError(IBMMQ_ERROR,
Expand Down
7 changes: 5 additions & 2 deletions native/src/main/java/io/ballerina/lib/ibm.ibmmq/Topic.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.ibm.mq.MQException;
import com.ibm.mq.MQGetMessageOptions;
import com.ibm.mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com.ibm.mq.MQTopic;
import com.ibm.mq.constants.CMQC;
import io.ballerina.lib.ibm.ibmmq.config.GetMessageOptions;
Expand All @@ -44,13 +45,15 @@ public class Topic {
private static final ExecutorService topicExecutorService = Executors.newCachedThreadPool(
new MQThreadFactory("balx-ibmmq-topic-client-network-thread"));

public static Object put(Environment environment, BObject topicObject, BMap message) {
public static Object put(Environment environment, BObject topicObject, BMap message, long options) {
MQTopic topic = (MQTopic) topicObject.getNativeData(Constants.NATIVE_TOPIC);
MQMessage mqMessage = CommonUtils.getMqMessageFromBMessage(message);
Future future = environment.markAsync();
topicExecutorService.execute(() -> {
try {
topic.put(mqMessage);
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options = (int) options;
topic.put(mqMessage, pmo);
future.complete(null);
} catch (Exception e) {
BError bError = createError(IBMMQ_ERROR,
Expand Down

0 comments on commit 5151afb

Please sign in to comment.