-
Notifications
You must be signed in to change notification settings - Fork 212
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PROTON-1442: [Cpp] Support for local transactions #437
base: main
Are you sure you want to change the base?
Changes from all commits
04eb16e
33c306d
baa68ce
ea79f47
ed624cd
0e20334
eb4514b
347cf6e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/* | ||
* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
* | ||
*/ | ||
|
||
#include "options.hpp" | ||
|
||
#include <proton/connection.hpp> | ||
#include <proton/container.hpp> | ||
#include <proton/message.hpp> | ||
#include <proton/message_id.hpp> | ||
#include <proton/messaging_handler.hpp> | ||
#include <proton/types.hpp> | ||
#include <proton/transaction.hpp> | ||
|
||
#include <iostream> | ||
#include <map> | ||
#include <string> | ||
|
||
#include <chrono> | ||
#include <thread> | ||
|
||
class tx_recv : public proton::messaging_handler, proton::transaction_handler { | ||
private: | ||
proton::receiver receiver; | ||
std::string url; | ||
int expected; | ||
int batch_size; | ||
int current_batch = 0; | ||
int committed = 0; | ||
|
||
proton::session session; | ||
proton::transaction transaction; | ||
public: | ||
tx_recv(const std::string &s, int c, int b): | ||
url(s), expected(c), batch_size(b) {} | ||
|
||
void on_container_start(proton::container &c) override { | ||
receiver = c.open_receiver(url); | ||
} | ||
|
||
void on_session_open(proton::session &s) override { | ||
session = s; | ||
std::cout << " [on_session_open] declare_txn started..." << std::endl; | ||
s.declare_transaction(*this); | ||
std::cout << " [on_session_open] declare_txn ended..." << std::endl; | ||
} | ||
|
||
void on_transaction_declare_failed(proton::transaction) {} | ||
void on_transaction_commit_failed(proton::transaction t) { | ||
std::cout << "Transaction Commit Failed" << std::endl; | ||
t.connection().close(); | ||
exit(-1); | ||
} | ||
|
||
void on_transaction_declared(proton::transaction t) override { | ||
std::cout << "[on_transaction_declared] txn called " << (&t) | ||
<< std::endl; | ||
std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty()) | ||
<< "\t" << transaction.is_empty() << std::endl; | ||
receiver.add_credit(batch_size); | ||
transaction = t; | ||
} | ||
|
||
void on_message(proton::delivery &d, proton::message &msg) override { | ||
std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl; | ||
transaction.accept(d); | ||
current_batch += 1; | ||
if(current_batch == batch_size) { | ||
transaction = proton::transaction(); // null | ||
} | ||
} | ||
|
||
void on_transaction_committed(proton::transaction t) override { | ||
committed += current_batch; | ||
current_batch = 0; | ||
std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl; | ||
if(committed == expected) { | ||
std::cout << "All messages committed" << std::endl; | ||
t.connection().close(); | ||
} | ||
else { | ||
session.declare_transaction(*this); | ||
} | ||
} | ||
|
||
}; | ||
|
||
int main(int argc, char **argv) { | ||
std::string address("127.0.0.1:5672/examples"); | ||
int message_count = 9; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect the defaults to be the same for tx_send and tx_recv (currently tx_send defaults to 6 messages while tx_recv to 9) |
||
int batch_size = 3; | ||
example::options opts(argc, argv); | ||
|
||
opts.add_value(address, 'a', "address", "connect and send to URL", "URL"); | ||
opts.add_value(message_count, 'm', "messages", "number of messages to send", "COUNT"); | ||
opts.add_value(batch_size, 'b', "batch_size", "number of messages in each transaction", "BATCH_SIZE"); | ||
|
||
try { | ||
opts.parse(); | ||
|
||
tx_recv recv(address, message_count, batch_size); | ||
proton::container(recv).run(); | ||
|
||
return 0; | ||
} catch (const example::bad_option& e) { | ||
std::cout << opts << std::endl << e.what() << std::endl; | ||
} catch (const std::exception& e) { | ||
std::cerr << e.what() << std::endl; | ||
} | ||
|
||
return 1; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
/* | ||
* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
* | ||
*/ | ||
|
||
#include "options.hpp" | ||
|
||
#include <proton/connection.hpp> | ||
#include <proton/container.hpp> | ||
#include <proton/message.hpp> | ||
#include <proton/message_id.hpp> | ||
#include <proton/messaging_handler.hpp> | ||
#include <proton/types.hpp> | ||
#include <proton/transaction.hpp> | ||
|
||
#include <iostream> | ||
#include <map> | ||
#include <string> | ||
|
||
#include <chrono> | ||
#include <thread> | ||
|
||
class tx_send : public proton::messaging_handler, proton::transaction_handler { | ||
private: | ||
proton::sender sender; | ||
std::string url; | ||
int total; | ||
int batch_size; | ||
int sent; | ||
int batch_index = 0; | ||
int current_batch = 0; | ||
int committed = 0; | ||
int confirmed = 0; | ||
|
||
proton::session session; | ||
proton::transaction transaction; | ||
public: | ||
tx_send(const std::string &s, int c, int b): | ||
url(s), total(c), batch_size(b), sent(0) {} | ||
|
||
void on_container_start(proton::container &c) override { | ||
sender = c.open_sender(url); | ||
} | ||
|
||
void on_session_open(proton::session &s) override { | ||
session = s; | ||
std::cout << " [on_session_open] declare_txn started..." << std::endl; | ||
s.declare_transaction(*this); | ||
std::cout << " [on_session_open] declare_txn ended..." << std::endl; | ||
} | ||
|
||
void on_transaction_declare_failed(proton::transaction) {} | ||
void on_transaction_commit_failed(proton::transaction t) { | ||
std::cout << "Transaction Commit Failed" << std::endl; | ||
t.connection().close(); | ||
exit(-1); | ||
} | ||
|
||
void on_transaction_declared(proton::transaction t) override { | ||
std::cout << "[on_transaction_declared] txn called " << (&t) | ||
<< std::endl; | ||
std::cout << "[on_transaction_declared] txn is_empty " << (t.is_empty()) | ||
<< "\t" << transaction.is_empty() << std::endl; | ||
transaction = t; | ||
|
||
send(sender); | ||
} | ||
|
||
void on_sendable(proton::sender &s) override { | ||
std::cout << " [OnSendable] transaction: " << &transaction | ||
<< std::endl; | ||
send(s); | ||
} | ||
|
||
void send(proton::sender &s) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sender is defined as tx_recv class attribute, so I believe we don't need to pass the sender to send method. |
||
static int unique_id = 10000; | ||
while (!transaction.is_empty() && sender.credit() && | ||
(committed + current_batch) < total) { | ||
proton::message msg; | ||
std::map<std::string, int> m; | ||
m["sequence"] = committed + current_batch; | ||
|
||
msg.id(unique_id++); | ||
msg.body(m); | ||
std::cout << "##### [example] transaction send msg: " << msg | ||
<< std::endl; | ||
transaction.send(sender, msg); | ||
current_batch += 1; | ||
if(current_batch == batch_size) | ||
{ | ||
std::cout << " >> Txn attempt commit" << std::endl; | ||
if (batch_index % 2 == 0) { | ||
transaction.commit(); | ||
} else { | ||
transaction.abort(); | ||
} | ||
|
||
transaction = proton::transaction(); | ||
batch_index++; | ||
} | ||
} | ||
} | ||
|
||
void on_tracker_accept(proton::tracker &t) override { | ||
confirmed += 1; | ||
std::cout << " [example] on_tracker_accept:" << confirmed | ||
<< std::endl; | ||
} | ||
|
||
void on_transaction_committed(proton::transaction t) override { | ||
committed += current_batch; | ||
current_batch = 0; | ||
std::cout<<" [OnTxnCommitted] Committed:"<< committed<< std::endl; | ||
if(committed == total) { | ||
std::cout << "All messages committed" << std::endl; | ||
t.connection().close(); | ||
} | ||
else { | ||
session.declare_transaction(*this); | ||
} | ||
} | ||
|
||
void on_transaction_aborted(proton::transaction t) override { | ||
std::cout << "Meesages Aborted ....." << std::endl; | ||
current_batch = 0; | ||
session.declare_transaction(*this); | ||
} | ||
|
||
void on_sender_close(proton::sender &s) override { | ||
current_batch = 0; | ||
} | ||
|
||
}; | ||
|
||
int main(int argc, char **argv) { | ||
std::string address("127.0.0.1:5672/examples"); | ||
int message_count = 6; | ||
int batch_size = 3; | ||
example::options opts(argc, argv); | ||
|
||
opts.add_value(address, 'a', "address", "connect and send to URL", "URL"); | ||
opts.add_value(message_count, 'm', "messages", "number of messages to send", "COUNT"); | ||
opts.add_value(batch_size, 'b', "batch_size", "number of messages in each transaction", "BATCH_SIZE"); | ||
|
||
try { | ||
opts.parse(); | ||
|
||
tx_send send(address, message_count, batch_size); | ||
proton::container(send).run(); | ||
|
||
return 0; | ||
} catch (const example::bad_option& e) { | ||
std::cout << opts << std::endl << e.what() << std::endl; | ||
} catch (const std::exception& e) { | ||
std::cerr << e.what() << std::endl; | ||
} | ||
|
||
return 1; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -88,6 +88,8 @@ class target_options { | |
/// **Unsettled API** Set the dynamic node properties. | ||
PN_CPP_EXTERN target_options& dynamic_properties(const target::dynamic_property_map&); | ||
|
||
PN_CPP_EXTERN target_options& type(const int); | ||
|
||
Comment on lines
+91
to
+92
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should not be needed. Introduce a new coordinator class that is peer to sender and receiver |
||
private: | ||
void apply(target&) const; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we should do rather commit here, that way it works as expected and the receiver is closed after expected number of messages received (with the current implementation it's not).