diff --git a/api/composer.json b/api/composer.json index 8101fed18..8ec82129d 100644 --- a/api/composer.json +++ b/api/composer.json @@ -21,12 +21,13 @@ "mpdf/mpdf": "8.1.2", "ralouphie/getallheaders": "2.0.5", "slim/slim": "2.6.2", - "stomp-php/stomp-php": "3.0.6", + "php-amqplib/php-amqplib": "^2.0", "symfony/http-foundation": "^5.4", "symfony/filesystem": "^5.4", "mpdf/qrcode": "^1.2", "mtcmedia/dhl-api": "dev-master#9b4b6315", - "maennchen/zipstream-php": "2.1.0" + "maennchen/zipstream-php": "2.1.0", + "phpseclib/bcmath_compat": "^2.0" }, "autoload": { "psr-4": { diff --git a/api/src/Page.php b/api/src/Page.php index c9f293507..f098868f9 100644 --- a/api/src/Page.php +++ b/api/src/Page.php @@ -1109,14 +1109,16 @@ function _submit_zocalo_recipe($recipe, $parameters, $error_code = 500) } - function _send_zocalo_message($zocalo_queue, $zocalo_message, $error_code = 500) + function _send_zocalo_message($rabbitmq_zocalo_vhost, $zocalo_message, $error_code = 500) { global - $zocalo_server, - $zocalo_username, - $zocalo_password; + $rabbitmq_zocalo_host, + $rabbitmq_zocalo_port, + $rabbitmq_zocalo_username, + $rabbitmq_zocalo_password, + $rabbitmq_zocalo_routing_key; - if (empty($zocalo_server) || empty($zocalo_queue)) + if (empty($rabbitmq_zocalo_host) || empty($rabbitmq_zocalo_vhost)) { $message = 'Zocalo server or queue not specified.'; error_log($message); @@ -1129,8 +1131,8 @@ function _send_zocalo_message($zocalo_queue, $zocalo_message, $error_code = 500) try { error_log("Sending message" . var_export($zocalo_message, true)); - $queue = new Queue($zocalo_server, $zocalo_username, $zocalo_password); - $queue->send($zocalo_queue, $zocalo_message, true, $this->user->loginId); + $queue = new Queue($rabbitmq_zocalo_host, $rabbitmq_zocalo_port, $rabbitmq_zocalo_username, $rabbitmq_zocalo_password, $rabbitmq_zocalo_vhost); + $queue->send($zocalo_message, $rabbitmq_zocalo_routing_key); } catch (Exception $e) { diff --git a/api/src/Page/Process.php b/api/src/Page/Process.php index 70fadb66d..d26ccd61b 100644 --- a/api/src/Page/Process.php +++ b/api/src/Page/Process.php @@ -355,7 +355,7 @@ function _add_reprocessing_sweep($args) { function _enqueue() { - global $zocalo_mx_reprocess_queue; + global $rabbitmq_zocalo_vhost; if (!$this->has_arg('PROCESSINGJOBID')) $this->_error('No processing job specified'); @@ -379,7 +379,7 @@ function _enqueue() 'ispyb_process' => intval($this->arg('PROCESSINGJOBID')), ) ); - $this->_send_zocalo_message($zocalo_mx_reprocess_queue, $message); + $this->_send_zocalo_message($rabbitmq_zocalo_vhost, $message); $this->_output(new \stdClass); } diff --git a/api/src/Queue.php b/api/src/Queue.php index 44b334099..4440d45a7 100644 --- a/api/src/Queue.php +++ b/api/src/Queue.php @@ -2,42 +2,34 @@ namespace SynchWeb; -use Stomp\Exception\StompException; -use Stomp\Stomp; +use PhpAmqpLib\Connection\AMQPStreamConnection; +use PhpAmqpLib\Message\AMQPMessage; class Queue { - private $server, $username, $password; + private $host, $port, $username, $password, $vhost; - function __construct($server, $username, $password) + function __construct($host, $port, $username, $password, $vhost) { - $this->server = $server; + $this->host = $host; + $this->port = $port; $this->username = $username; $this->password = $password; + $this->vhost = $vhost; } - function send($queue, array $message, $persistent = false, $login = null) + function send(array $message, $routing_key) { - try { - $connection = new Stomp($this->server); - - $connection->connect($this->username, $this->password); - - $connection->send( - $queue, - json_encode($message, JSON_UNESCAPED_SLASHES), - array( - 'persistent' => ($persistent === true), - 'synchweb.host' => gethostname(), - 'synchweb.user' => $login, - ) - ); - - $connection->disconnect(); - } catch (StompException $e) { - /** @noinspection PhpUnhandledExceptionInspection */ - - throw $e; - } + $connection = new AMQPStreamConnection($this->host, $this->port, $this->username, $this->password, $this->vhost); + $channel = $connection->channel(); + + $msg = new AMQPMessage( + json_encode($message, JSON_UNESCAPED_SLASHES) + ); + + $channel->basic_publish($msg, null, $routing_key); + + $channel->close(); + $connection->close(); } }