@@ -8351,3 +8351,86 @@ TEST(MaxTimeoutTest, ContentStreamSSL) {
83518351 max_timeout_test (svr, cli, timeout, threshold);
83528352}
83538353#endif
8354+
8355+ class EventDispatcher {
8356+ public:
8357+ EventDispatcher () {}
8358+
8359+ void wait_event (DataSink *sink) {
8360+ unique_lock<mutex> lk (m_);
8361+ int id = id_;
8362+ cv_.wait (lk, [&] { return cid_ == id; });
8363+ sink->write (message_.data (), message_.size ());
8364+ }
8365+
8366+ void send_event (const string &message) {
8367+ lock_guard<mutex> lk (m_);
8368+ cid_ = id_++;
8369+ message_ = message;
8370+ cv_.notify_all ();
8371+ }
8372+
8373+ private:
8374+ mutex m_;
8375+ condition_variable cv_;
8376+ atomic_int id_{0 };
8377+ atomic_int cid_{-1 };
8378+ string message_;
8379+ };
8380+
8381+ TEST (ClientInThreadTest, Issue2068) {
8382+ EventDispatcher ed;
8383+
8384+ Server svr;
8385+ svr.Get (" /event1" , [&](const Request & /* req*/ , Response &res) {
8386+ res.set_chunked_content_provider (" text/event-stream" ,
8387+ [&](size_t /* offset*/ , DataSink &sink) {
8388+ ed.wait_event (&sink);
8389+ return true ;
8390+ });
8391+ });
8392+
8393+ auto listen_thread = std::thread ([&svr]() { svr.listen (HOST, PORT); });
8394+
8395+ svr.wait_until_ready ();
8396+
8397+ thread event_thread ([&] {
8398+ int id = 0 ;
8399+ while (svr.is_running ()) {
8400+ this_thread::sleep_for (chrono::milliseconds (500 ));
8401+
8402+ std::stringstream ss;
8403+ ss << " data: " << id << " \n\n " ;
8404+ ed.send_event (ss.str ());
8405+ id++;
8406+ }
8407+ });
8408+
8409+ auto se = detail::scope_exit ([&] {
8410+ svr.stop ();
8411+
8412+ listen_thread.join ();
8413+ event_thread.join ();
8414+
8415+ ASSERT_FALSE (svr.is_running ());
8416+ });
8417+
8418+ {
8419+ auto client = detail::make_unique<Client>(HOST, PORT);
8420+ client->set_read_timeout (std::chrono::minutes (10 ));
8421+
8422+ std::atomic<bool > stop{false };
8423+
8424+ std::thread t ([&] {
8425+ client->Get (" /event1" ,
8426+ [&](const char *, size_t ) -> bool { return !stop; });
8427+ });
8428+
8429+ std::this_thread::sleep_for (std::chrono::seconds (2 ));
8430+ stop = true ;
8431+ client->stop ();
8432+ client.reset ();
8433+
8434+ t.join ();
8435+ }
8436+ }
0 commit comments