Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscription->unsubscribe() throws "Subscription not found" exception #7

Open
tommyseus opened this issue Jun 12, 2018 · 5 comments
Open

Comments

@tommyseus
Copy link

tommyseus commented Jun 12, 2018

I have an issue with an unknown subscription.

I believe the nats streaming server sends an response to an UNSUB request. The \Nats\Connection::handleMSG doesn't expect this response and throws a "Subscription not found" exception.

private function closeOrUnsubscribe($doClose)
{
$this->stanCon->natsCon()->unsubscribe($this->sid);

@byrnedo
Copy link
Owner

byrnedo commented Jun 12, 2018

Hi @tommyseus,

What version of nats streaming is it?

Could you explain your use case and flow that triggers this?

@tommyseus
Copy link
Author

I'm sorry, the exception has a different cause.

The error is triggered by the next message. There are 2 messages published. But the subscribe handles only one (wait (1)) and than unsubscribed from the subject. Between wait and unsubscribe the second message has arrieved. I have added a sleep so that the error always occurs for demonstration.

publisher.php :

<?php

require __DIR__ . '/vendor/autoload.php';

ini_set('display_errors', '1');

$options = new Nats\ConnectionOptions();
$options->setHost('da-nats-streaming');

$streamingOptions = new \NatsStreaming\ConnectionOptions();
$streamingOptions->setClientID(uniqid('test_', false));
$streamingOptions->setClusterID("test-cluster");
$streamingOptions->setNatsOptions($options);

$streamingConnection = new \NatsStreaming\Connection($streamingOptions);
$streamingConnection->connect(60);

// publish

for ($i = 1; $i <= 4; ++$i) {
    $request = $streamingConnection->publish('dummy-subject', "publish $i  - " . date('Y-m-d H:i:s'));
    $request->wait();
    var_dump($request->gotAck());
}

subscriber.php :

<?php

require __DIR__ . '/vendor/autoload.php';

ini_set('display_errors', '1');

$options = new Nats\ConnectionOptions();
$options->setHost('da-nats-streaming');

$streamingOptions = new \NatsStreaming\ConnectionOptions();
$streamingOptions->setClientID(uniqid('dummy-client-id'));
$streamingOptions->setClusterID("test-cluster");
$streamingOptions->setNatsOptions($options);

$streamingConnection = new \NatsStreaming\Connection($streamingOptions);
$streamingConnection->connect(60);

// subscribe

$subscriptionOptions = new NatsStreaming\SubscriptionOptions();
$subscriptionOptions->setAckWaitSecs(10);
$subscriptionOptions->setManualAck(true);
$subscriptionOptions->setDurableName('dummy-durable');
$subscriptionOptions->setMaxInFlight(1);

$subscribe = $streamingConnection->queueSubscribe(
    'dummy-subject',
    'abcdefghijkl',
    function ($message) {
        /* @var $message \NatsStreaming\Msg */
        echo 'received: ' . $message->getData() . PHP_EOL;
        $message->ack();
    },
    $subscriptionOptions
);
$subscribe->wait(1);

// time to receive the next message from the nats server
sleep(4);

// triggers a wait and consumes the next message. But throws a exception because the subscriber callback was removed.
$subscribe->unsubscribe();

Versions:
nats-streamin: 0.9.2 (https://hub.docker.com/_/nats-streaming/)
byrnedo/php-nats-streaming: 0.2.4
repejota/nats: 0.8.7

@byrnedo
Copy link
Owner

byrnedo commented Jun 13, 2018 via email

@byrnedo
Copy link
Owner

byrnedo commented Jul 12, 2018

@tommyseus bump, have I grasped the flow?

@byrnedo
Copy link
Owner

byrnedo commented Aug 12, 2018

@tommyseus

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants