108 lines
2.6 KiB
Perl
Raw Normal View History

2021-01-09 20:12:05 +00:00
#!/usr/bin/perl
# use warnings;
# use strict;
use Net::Stomp;
use Net::MQTT::Simple;
use AnyEvent::RabbitMQ;
use Data::Dumper;
sub assert($;$) {
my ($required, $message) = @_;
unless ($required) {
my $msg = "Assertion failed";
$msg .= " ($message)" if $message;
die $msg;
}
}
sub _debug($) {
my $msg = shift;
printf("%s [%s] %s\n", time, "DEBUG", $msg);
}
### Check required environement variables are present
foreach (
qw(
AMQ_SERVER
AMQ_PORT
AMQ_USER
AMQ_PASSWORD
AMQ_TOPICS
MQTT_SERVER
MQTT_PUBLISH_BASE
2021-01-09 20:47:43 +00:00
RABBITMQ_SERVER
RABBITMQ_PORT
RABBITMQ_USER
RABBITMQ_PASSWORD
RABBITMQ_VHOST
2021-01-09 20:12:05 +00:00
)
) {
assert($ENV{$_}, "`$_' not available");
}
## Connect to ActiveMQ
my $amq = Net::Stomp->new( { hostname => $ENV{AMQ_SERVER}, port => $ENV{AMQ_PORT} } ) or die "Failed to connect to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}";
$amq->connect( {
login => $ENV{AMQ_USER},
passcode => $ENV{AMQ_PASSWORD},
# "client-id" => $conf->{feed}->{"client-id"}
} ) or die "Failed to authenticate to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}";
## Connect to MQTT
2021-01-09 20:47:43 +00:00
_debug("Connecting to mqtt://$ENV{MQTT_SERVER}");
2021-01-09 20:12:05 +00:00
my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}";
2021-01-09 20:47:43 +00:00
use Net::RabbitMQ;
my $rabbit = Net::RabbitMQ->new();
_debug("Connecting to rabbitmq://$ENV{RABBITMQ_SERVER}");
$rabbit->connect($ENV{RABBITMQ_SERVER},
{
user => $ENV{RABBITMQ_USER},
password => $ENV{RABBITMQ_PASSWORD},
vhost => $ENV{RABBITMQ_VHOST},
channel_max => 64,
});
_debug("Open rabbitmq channel");
$rabbit->channel_open(1);
2021-01-09 20:12:05 +00:00
## Subscribe to ActiveMQ topics
foreach my $topic (split (/\s*,\s*/, $ENV{AMQ_TOPICS})) {
_debug("Subscribing to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic");
$amq->subscribe( {
destination => "/topic/$topic",
ack => 'client',
} ) or warn "Unable to subscribe to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic";
}
while(1) {
my $incoming = $amq->receive_frame;
if (!defined $incoming) {
_debug("Connection issues?");
next;
}
my $incoming_destination = $incoming->destination;
$incoming_destination =~ s~^/[^/]*/~~;
_debug("AMQ -> " . $incoming->destination);
_debug("MQTT <- " . $ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination);
$mqtt->retain($ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination, $incoming->body);
2021-01-09 20:47:43 +00:00
_debug("RABBIT <- " . $incoming->destination);
$rabbit->publish(1, "test", $incoming->body);
2021-01-09 20:12:05 +00:00
$amq->ack({ frame => $incoming });
2021-01-09 20:47:43 +00:00
}
$mq->disconnect();