2021-01-24 14:37:46 +00:00

119 lines
3.1 KiB
Perl
Executable File
Raw Permalink Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env perl
# use warnings;
use strict;
use Net::Stomp;
use Net::MQTT::Simple;
use Net::RabbitMQ;
use Data::Dumper;
sub assert($;$) {
my ($required, $message) = @_;
unless ($required) {
my $msg = "Assertion failed";
$msg .= " ($message)" if $message;
die $msg;
}
}
sub _debug($) {
my $msg = shift;
return unless $ENV{DEBUG};
printf("%s [%s] %s\n", time, "DEBUG", $msg);
}
### Check required environement variables are present
foreach (
qw(
AMQ_SERVER
AMQ_PORT
AMQ_USER
AMQ_PASSWORD
AMQ_TOPICS
MQTT_SERVER
MQTT_PUBLISH_BASE
RABBITMQ_SERVER
RABBITMQ_PORT
RABBITMQ_USER
RABBITMQ_PASSWORD
RABBITMQ_VHOST
)
) {
assert($ENV{$_}, "`$_' not available");
}
## Connect to ActiveMQ
my $amq = Net::Stomp->new( { hostname => $ENV{AMQ_SERVER}, port => $ENV{AMQ_PORT} } ) or die "Failed to connect to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}";
$amq->connect( {
login => $ENV{AMQ_USER},
passcode => $ENV{AMQ_PASSWORD},
# "client-id" => $conf->{feed}->{"client-id"}
} ) or die "Failed to authenticate to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}";
## Connect to MQTT
_debug("Connecting to mqtt://$ENV{MQTT_SERVER}");
my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}";
my $rabbit;
my %rabbit_queues;
unless ($ENV{DISABLE_RABBIT}) {
$rabbit = Net::RabbitMQ->new();
_debug("Connecting to rabbitmq://$ENV{RABBITMQ_SERVER}");
$rabbit->connect($ENV{RABBITMQ_SERVER},
{
user => $ENV{RABBITMQ_USER},
password => $ENV{RABBITMQ_PASSWORD},
vhost => $ENV{RABBITMQ_VHOST},
channel_max => 64,
});
_debug("Open rabbitmq channel");
$rabbit->channel_open(1);
}
## Subscribe to ActiveMQ topics
foreach my $topic (split (/\s*,\s*/, $ENV{AMQ_TOPICS})) {
_debug("Subscribing to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic");
$amq->subscribe( {
destination => "/topic/$topic",
ack => 'client',
} ) or warn "Unable to subscribe to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic";
}
while(1) {
my $incoming = $amq->receive_frame;
if (!defined $incoming) {
_debug("Connection issues?");
next;
}
my $incoming_destination = $incoming->destination;
$incoming_destination =~ s~^/[^/]*/~~;
_debug("AMQ -> " . $incoming->destination);
_debug("MQTT <- " . $ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination);
$mqtt->retain($ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination, $incoming->body);
unless($ENV{DISABLE_RABBIT}) {
_debug("RABBIT <- " . $incoming->destination);
unless($rabbit_queues{$incoming_destination}) {
_debug("RABBIT Declaring queue $incoming_destination");
$rabbit->queue_declare(1, $incoming_destination, { durable => 1 });
$rabbit_queues{$incoming_destination}++;
}
$rabbit->publish(1, $incoming_destination, $incoming->body);
}
# $amq->ack({ frame => $incoming });
}
$amq->disconnect();