2021-01-23 16:44:00 +00:00
|
|
|
|
#!/usr/bin/env perl
|
2021-01-09 20:12:05 +00:00
|
|
|
|
|
|
|
|
|
# use warnings;
|
2021-01-23 16:44:00 +00:00
|
|
|
|
use strict;
|
2021-01-09 20:12:05 +00:00
|
|
|
|
|
|
|
|
|
use Net::Stomp;
|
|
|
|
|
use Net::MQTT::Simple;
|
2021-01-10 08:28:57 +00:00
|
|
|
|
use Net::RabbitMQ;
|
2021-01-09 20:12:05 +00:00
|
|
|
|
|
|
|
|
|
use Data::Dumper;
|
|
|
|
|
|
|
|
|
|
sub assert($;$) {
|
|
|
|
|
my ($required, $message) = @_;
|
|
|
|
|
unless ($required) {
|
|
|
|
|
my $msg = "Assertion failed";
|
|
|
|
|
$msg .= " ($message)" if $message;
|
|
|
|
|
die $msg;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
sub _debug($) {
|
|
|
|
|
my $msg = shift;
|
2021-01-10 08:28:57 +00:00
|
|
|
|
return unless $ENV{DEBUG};
|
2021-01-09 20:12:05 +00:00
|
|
|
|
printf("%s [%s] %s\n", time, "DEBUG", $msg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
### Check required environement variables are present
|
|
|
|
|
foreach (
|
|
|
|
|
qw(
|
|
|
|
|
AMQ_SERVER
|
|
|
|
|
AMQ_PORT
|
|
|
|
|
AMQ_USER
|
|
|
|
|
AMQ_PASSWORD
|
|
|
|
|
AMQ_TOPICS
|
|
|
|
|
|
|
|
|
|
MQTT_SERVER
|
|
|
|
|
MQTT_PUBLISH_BASE
|
2021-01-09 20:47:43 +00:00
|
|
|
|
|
|
|
|
|
RABBITMQ_SERVER
|
|
|
|
|
RABBITMQ_PORT
|
|
|
|
|
RABBITMQ_USER
|
|
|
|
|
RABBITMQ_PASSWORD
|
|
|
|
|
RABBITMQ_VHOST
|
2021-01-09 20:12:05 +00:00
|
|
|
|
)
|
|
|
|
|
) {
|
|
|
|
|
assert($ENV{$_}, "`$_' not available");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## Connect to ActiveMQ
|
|
|
|
|
my $amq = Net::Stomp->new( { hostname => $ENV{AMQ_SERVER}, port => $ENV{AMQ_PORT} } ) or die "Failed to connect to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}";
|
|
|
|
|
$amq->connect( {
|
|
|
|
|
login => $ENV{AMQ_USER},
|
|
|
|
|
passcode => $ENV{AMQ_PASSWORD},
|
|
|
|
|
# "client-id" => $conf->{feed}->{"client-id"}
|
|
|
|
|
} ) or die "Failed to authenticate to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}";
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
## Connect to MQTT
|
2021-01-09 20:47:43 +00:00
|
|
|
|
_debug("Connecting to mqtt://$ENV{MQTT_SERVER}");
|
2021-01-09 20:12:05 +00:00
|
|
|
|
my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}";
|
|
|
|
|
|
2021-01-24 14:32:31 +00:00
|
|
|
|
my $rabbit;
|
2021-01-10 08:28:57 +00:00
|
|
|
|
my %rabbit_queues;
|
2021-01-24 14:32:31 +00:00
|
|
|
|
unless ($ENV{DISABLE_RABBIT}) {
|
|
|
|
|
$rabbit = Net::RabbitMQ->new();
|
|
|
|
|
_debug("Connecting to rabbitmq://$ENV{RABBITMQ_SERVER}");
|
|
|
|
|
$rabbit->connect($ENV{RABBITMQ_SERVER},
|
|
|
|
|
{
|
|
|
|
|
user => $ENV{RABBITMQ_USER},
|
|
|
|
|
password => $ENV{RABBITMQ_PASSWORD},
|
|
|
|
|
vhost => $ENV{RABBITMQ_VHOST},
|
|
|
|
|
channel_max => 64,
|
|
|
|
|
});
|
|
|
|
|
_debug("Open rabbitmq channel");
|
|
|
|
|
$rabbit->channel_open(1);
|
|
|
|
|
}
|
2021-01-09 20:47:43 +00:00
|
|
|
|
|
2021-01-09 20:12:05 +00:00
|
|
|
|
## Subscribe to ActiveMQ topics
|
|
|
|
|
foreach my $topic (split (/\s*,\s*/, $ENV{AMQ_TOPICS})) {
|
|
|
|
|
_debug("Subscribing to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic");
|
|
|
|
|
$amq->subscribe( {
|
|
|
|
|
destination => "/topic/$topic",
|
|
|
|
|
ack => 'client',
|
|
|
|
|
|
|
|
|
|
} ) or warn "Unable to subscribe to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
while(1) {
|
|
|
|
|
my $incoming = $amq->receive_frame;
|
|
|
|
|
if (!defined $incoming) {
|
|
|
|
|
_debug("Connection issues?");
|
|
|
|
|
next;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
my $incoming_destination = $incoming->destination;
|
|
|
|
|
$incoming_destination =~ s~^/[^/]*/~~;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_debug("AMQ -> " . $incoming->destination);
|
|
|
|
|
_debug("MQTT <- " . $ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination);
|
|
|
|
|
$mqtt->retain($ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination, $incoming->body);
|
|
|
|
|
|
2021-01-24 14:32:31 +00:00
|
|
|
|
unless($ENV{DISABLE_RABBIT}) {
|
|
|
|
|
_debug("RABBIT <- " . $incoming->destination);
|
|
|
|
|
unless($rabbit_queues{$incoming_destination}) {
|
|
|
|
|
_debug("RABBIT Declaring queue $incoming_destination");
|
|
|
|
|
$rabbit->queue_declare(1, $incoming_destination, { durable => 1 });
|
|
|
|
|
$rabbit_queues{$incoming_destination}++;
|
|
|
|
|
}
|
|
|
|
|
$rabbit->publish(1, $incoming_destination, $incoming->body);
|
|
|
|
|
}
|
|
|
|
|
# $amq->ack({ frame => $incoming });
|
2021-01-09 20:12:05 +00:00
|
|
|
|
|
2021-01-09 20:47:43 +00:00
|
|
|
|
}
|
|
|
|
|
|
2021-01-23 16:44:00 +00:00
|
|
|
|
$amq->disconnect();
|