Skip to content

Commit

Permalink
Add tx_recv file
Browse files Browse the repository at this point in the history
  • Loading branch information
DreamPearl committed Dec 16, 2024
1 parent eb4514b commit 347cf6e
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 160 deletions.
3 changes: 2 additions & 1 deletion cpp/examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ foreach(example
service_bus
multithreaded_client
multithreaded_client_flow_control
tx_send)
tx_send
tx_recv)
add_executable(${example} ${example}.cpp)
target_link_libraries(${example} Proton::cpp Threads::Threads)
endforeach()
Expand Down
129 changes: 129 additions & 0 deletions cpp/examples/tx_recv.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "options.hpp"

#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/message.hpp>
#include <proton/message_id.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/types.hpp>
#include <proton/transaction.hpp>

#include <iostream>
#include <map>
#include <string>

#include <chrono>
#include <thread>

class tx_recv : public proton::messaging_handler, proton::transaction_handler {
private:
proton::receiver receiver;
std::string url;
int expected;
int batch_size;
int current_batch = 0;
int committed = 0;

proton::session session;
proton::transaction transaction;
public:
tx_recv(const std::string &s, int c, int b):
url(s), expected(c), batch_size(b) {}

void on_container_start(proton::container &c) override {
receiver = c.open_receiver(url);
}

void on_session_open(proton::session &s) override {
session = s;
std::cout << " [on_session_open] declare_txn started..." << std::endl;
s.declare_transaction(*this);
std::cout << " [on_session_open] declare_txn ended..." << std::endl;
}

void on_transaction_declare_failed(proton::transaction) {}
void on_transaction_commit_failed(proton::transaction t) {
std::cout << "Transaction Commit Failed" << std::endl;
t.connection().close();
exit(-1);
}

void on_transaction_declared(proton::transaction t) override {
std::cout << "[on_transaction_declared] txn called " << (&t)
<< std::endl;
std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty())
<< "\t" << transaction.is_empty() << std::endl;
receiver.add_credit(batch_size);
transaction = t;
}

void on_message(proton::delivery &d, proton::message &msg) override {
std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl;
transaction.accept(d);
current_batch += 1;
if(current_batch == batch_size) {
transaction = proton::transaction(); // null
}
}

void on_transaction_committed(proton::transaction t) override {
committed += current_batch;
current_batch = 0;
std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
if(committed == expected) {
std::cout << "All messages committed" << std::endl;
t.connection().close();
}
else {
session.declare_transaction(*this);
}
}

};

int main(int argc, char **argv) {
std::string address("127.0.0.1:5672/examples");
int message_count = 9;
int batch_size = 3;
example::options opts(argc, argv);

opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
opts.add_value(message_count, 'm', "messages", "number of messages to send", "COUNT");
opts.add_value(batch_size, 'b', "batch_size", "number of messages in each transaction", "BATCH_SIZE");

try {
opts.parse();

tx_recv recv(address, message_count, batch_size);
proton::container(recv).run();

return 0;
} catch (const example::bad_option& e) {
std::cout << opts << std::endl << e.what() << std::endl;
} catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}

return 1;
}
35 changes: 16 additions & 19 deletions cpp/examples/tx_send.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,35 +48,33 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
int committed = 0;
int confirmed = 0;

proton::container *container;
// proton::transaction_handler transaction_handler;
proton::session session;
proton::transaction transaction;
proton::connection connection;
public:
tx_send(const std::string &s, int c, int b):
url(s), total(c), batch_size(b), sent(0) {}

void on_container_start(proton::container &c) override {
container = &c; // TODO: Fix error
sender = c.open_sender(url);
connection = sender.connection();
std::cout << " [on_container_start] declare_txn started..." << std::endl;
c.declare_transaction(connection, *this);
std::cout << " [on_container_start] completed!!" << &transaction
<< std::endl;
}

void on_session_open(proton::session &s) override {
session = s;
std::cout << " [on_session_open] declare_txn started..." << std::endl;
s.declare_transaction(*this);
std::cout << " [on_session_open] declare_txn ended..." << std::endl;
}

void on_transaction_declare_failed(proton::transaction) {}
void on_transaction_commit_failed(proton::transaction) {
void on_transaction_commit_failed(proton::transaction t) {
std::cout << "Transaction Commit Failed" << std::endl;
connection.close();
t.connection().close();
exit(-1);
}

void on_transaction_declared(proton::transaction t) override {
std::cout << "[on_transaction_declared] txn called " << (&t)
<< std::endl;
// connection.close();
std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty())
<< "\t" << transaction.is_empty() << std::endl;
transaction = t;
Expand All @@ -85,23 +83,22 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
}

void on_sendable(proton::sender &s) override {
// send();
std::cout << " [OnSendable] transaction: " << &transaction
<< std::endl;
send(s);
}

void send(proton::sender &s) {
// TODO: Add more condition in while loop
static int unique_id = 10000;
while (!transaction.is_empty() && sender.credit() &&
(committed + current_batch) < total) {
proton::message msg;
std::map<std::string, int> m;
m["sequence"] = committed + current_batch;

msg.id(committed + current_batch + 1);
msg.id(unique_id++);
msg.body(m);
std::cout << " [example] transaction send msg: " << msg
std::cout << "##### [example] transaction send msg: " << msg
<< std::endl;
transaction.send(sender, msg);
current_batch += 1;
Expand Down Expand Up @@ -132,17 +129,17 @@ class tx_send : public proton::messaging_handler, proton::transaction_handler {
std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl;
if(committed == total) {
std::cout << "All messages committed" << std::endl;
connection.close();
t.connection().close();
}
else {
container->declare_transaction(connection, *this);
session.declare_transaction(*this);
}
}

void on_transaction_aborted(proton::transaction t) override {
std::cout << "Meesages Aborted ....." << std::endl;
current_batch = 0;
container->declare_transaction(connection, *this);
session.declare_transaction(*this);
}

void on_sender_close(proton::sender &s) override {
Expand Down
1 change: 0 additions & 1 deletion cpp/include/proton/container.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,6 @@ class PN_CPP_CLASS_EXTERN container {
/// Cancel task for the given work_handle.
PN_CPP_EXTERN void cancel(work_handle);

PN_CPP_EXTERN transaction declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge = false);
private:
/// Declare both v03 and v11 if compiling with c++11 as the library contains both.
/// A C++11 user should never call the v03 overload so it is private in this case
Expand Down
2 changes: 2 additions & 0 deletions cpp/include/proton/session.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp
/// Get user data from this session.
PN_CPP_EXTERN void* user_data() const;

PN_CPP_EXTERN transaction declare_transaction(proton::transaction_handler &handler, bool settle_before_discharge = false);

/// @cond INTERNAL
friend class internal::factory<session>;
friend class session_iterator;
Expand Down
1 change: 0 additions & 1 deletion cpp/include/proton/tracker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "./binary.hpp"
#include "./internal/export.hpp"
#include "./transfer.hpp"
#include "./messaging_handler.hpp"

/// @file
/// @copybrief proton::tracker
Expand Down
6 changes: 4 additions & 2 deletions cpp/include/proton/transaction.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class transaction_impl {

void discharge(bool failed);
void release_pending();
void accept(tracker &d);
void accept(delivery &d);
void update(tracker &d, uint64_t state);
void set_id(binary _id);

Expand Down Expand Up @@ -92,9 +92,11 @@ PN_CPP_CLASS_EXTERN transaction {
PN_CPP_EXTERN void declare();
PN_CPP_EXTERN void handle_outcome(proton::tracker);
PN_CPP_EXTERN proton::tracker send(proton::sender s, proton::message msg);
PN_CPP_EXTERN void accept(delivery &t);
PN_CPP_EXTERN proton::connection connection() const;

friend class transaction_impl;
friend class container::impl;
friend class session;
};

class
Expand Down
30 changes: 15 additions & 15 deletions cpp/include/proton/transfer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,25 @@
/// @copybrief proton::transfer

struct pn_delivery_t;
struct pn_disposition_t;
// struct pn_disposition_t;

namespace proton {

class disposition : public internal::object<pn_disposition_t> {
/// @cond INTERNAL
disposition(pn_disposition_t *d) : internal::object<pn_disposition_t>(d) {}
/// @endcond
// class disposition : public internal::object<pn_disposition_t> {
// /// @cond INTERNAL
// disposition(pn_disposition_t *d) : internal::object<pn_disposition_t>(d) {}
// /// @endcond

public:
/// Create an empty disposition.
disposition() : internal::object<pn_disposition_t>(0) {}
// public:
// /// Create an empty disposition.
// disposition() : internal::object<pn_disposition_t>(0) {}

proton::value data() const;
// proton::value data() const;

/// @cond INTERNAL
friend class internal::factory<disposition>;
/// @endcond
};
// /// @cond INTERNAL
// friend class internal::factory<disposition>;
// /// @endcond
// };

/// The base class for delivery and tracker.
class transfer : public internal::object<pn_delivery_t> {
Expand Down Expand Up @@ -105,8 +105,8 @@ class transfer : public internal::object<pn_delivery_t> {
/// Get user data from this transfer.
PN_CPP_EXTERN void* user_data() const;

PN_CPP_EXTERN disposition remote();
PN_CPP_EXTERN disposition local();
// PN_CPP_EXTERN disposition remote();
// PN_CPP_EXTERN disposition local();

/// @cond INTERNAL
friend class internal::factory<transfer>;
Expand Down
5 changes: 0 additions & 5 deletions cpp/src/container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "proton/uuid.hpp"

#include "proactor_container_impl.hpp"
#include <vector>

namespace proton {

Expand All @@ -46,10 +45,6 @@ returned<connection> container::connect(const std::string &url) {
return connect(url, connection_options());
}

transaction container::declare_transaction(proton::connection conn, proton::transaction_handler &handler, bool settle_before_discharge) {
return impl_->declare_transaction(conn, handler, settle_before_discharge);
}

returned<sender> container::open_sender(const std::string &url) {
return open_sender(url, proton::sender_options(), connection_options());
}
Expand Down
Loading

0 comments on commit 347cf6e

Please sign in to comment.