Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LIMS-1354: Migrate from ActiveMQ to RabbitMQ #826

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
"mpdf/mpdf": "8.1.2",
"ralouphie/getallheaders": "2.0.5",
"slim/slim": "2.6.2",
"stomp-php/stomp-php": "3.0.6",
"php-amqplib/php-amqplib": "^2.0",
"symfony/http-foundation": "^5.4",
"symfony/filesystem": "^5.4",
"mpdf/qrcode": "^1.2",
"mtcmedia/dhl-api": "dev-master#9b4b6315",
"maennchen/zipstream-php": "2.1.0"
"maennchen/zipstream-php": "2.1.0",
"phpseclib/bcmath_compat": "^2.0"
},
"autoload": {
"psr-4": {
Expand Down
16 changes: 9 additions & 7 deletions api/src/Page.php
Original file line number Diff line number Diff line change
Expand Up @@ -1109,14 +1109,16 @@ function _submit_zocalo_recipe($recipe, $parameters, $error_code = 500)
}


function _send_zocalo_message($zocalo_queue, $zocalo_message, $error_code = 500)
function _send_zocalo_message($rabbitmq_zocalo_vhost, $zocalo_message, $error_code = 500)
{
global
$zocalo_server,
$zocalo_username,
$zocalo_password;
$rabbitmq_zocalo_host,
$rabbitmq_zocalo_port,
$rabbitmq_zocalo_username,
$rabbitmq_zocalo_password,
$rabbitmq_zocalo_routing_key;

if (empty($zocalo_server) || empty($zocalo_queue))
if (empty($rabbitmq_zocalo_host) || empty($rabbitmq_zocalo_vhost))
{
$message = 'Zocalo server or queue not specified.';
error_log($message);
Expand All @@ -1129,8 +1131,8 @@ function _send_zocalo_message($zocalo_queue, $zocalo_message, $error_code = 500)
try
{
error_log("Sending message" . var_export($zocalo_message, true));
$queue = new Queue($zocalo_server, $zocalo_username, $zocalo_password);
$queue->send($zocalo_queue, $zocalo_message, true, $this->user->loginId);
$queue = new Queue($rabbitmq_zocalo_host, $rabbitmq_zocalo_port, $rabbitmq_zocalo_username, $rabbitmq_zocalo_password, $rabbitmq_zocalo_vhost);
$queue->send($zocalo_message, $rabbitmq_zocalo_routing_key);
}
catch (Exception $e)
{
Expand Down
4 changes: 2 additions & 2 deletions api/src/Page/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ function _add_reprocessing_sweep($args) {

function _enqueue()
{
global $zocalo_mx_reprocess_queue;
global $rabbitmq_zocalo_vhost;

if (!$this->has_arg('PROCESSINGJOBID')) $this->_error('No processing job specified');

Expand All @@ -379,7 +379,7 @@ function _enqueue()
'ispyb_process' => intval($this->arg('PROCESSINGJOBID')),
)
);
$this->_send_zocalo_message($zocalo_mx_reprocess_queue, $message);
$this->_send_zocalo_message($rabbitmq_zocalo_vhost, $message);

$this->_output(new \stdClass);
}
Expand Down
46 changes: 19 additions & 27 deletions api/src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,34 @@

namespace SynchWeb;

use Stomp\Exception\StompException;
use Stomp\Stomp;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class Queue
{
private $server, $username, $password;
private $host, $port, $username, $password, $vhost;

function __construct($server, $username, $password)
function __construct($host, $port, $username, $password, $vhost)
{
$this->server = $server;
$this->host = $host;
$this->port = $port;
$this->username = $username;
$this->password = $password;
$this->vhost = $vhost;
}

function send($queue, array $message, $persistent = false, $login = null)
function send(array $message, $routing_key)
{
try {
$connection = new Stomp($this->server);

$connection->connect($this->username, $this->password);

$connection->send(
$queue,
json_encode($message, JSON_UNESCAPED_SLASHES),
array(
'persistent' => ($persistent === true),
'synchweb.host' => gethostname(),
'synchweb.user' => $login,
)
);

$connection->disconnect();
} catch (StompException $e) {
/** @noinspection PhpUnhandledExceptionInspection */

throw $e;
}
$connection = new AMQPStreamConnection($this->host, $this->port, $this->username, $this->password, $this->vhost);
$channel = $connection->channel();

$msg = new AMQPMessage(
json_encode($message, JSON_UNESCAPED_SLASHES)
);

$channel->basic_publish($msg, null, $routing_key);

$channel->close();
$connection->close();
}
}
Loading