Working rabbit sender

This commit is contained in:
Matthew Slowe 2021-01-10 08:28:57 +00:00
parent 5bae47c1cf
commit 064e7cf716

View File

@ -5,7 +5,7 @@
use Net::Stomp;
use Net::MQTT::Simple;
use AnyEvent::RabbitMQ;
use Net::RabbitMQ;
use Data::Dumper;
@ -20,6 +20,7 @@ sub assert($;$) {
sub _debug($) {
my $msg = shift;
return unless $ENV{DEBUG};
printf("%s [%s] %s\n", time, "DEBUG", $msg);
}
@ -59,7 +60,6 @@ $amq->connect( {
_debug("Connecting to mqtt://$ENV{MQTT_SERVER}");
my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}";
use Net::RabbitMQ;
my $rabbit = Net::RabbitMQ->new();
_debug("Connecting to rabbitmq://$ENV{RABBITMQ_SERVER}");
$rabbit->connect($ENV{RABBITMQ_SERVER},
@ -71,6 +71,7 @@ $rabbit->connect($ENV{RABBITMQ_SERVER},
});
_debug("Open rabbitmq channel");
$rabbit->channel_open(1);
my %rabbit_queues;
## Subscribe to ActiveMQ topics
foreach my $topic (split (/\s*,\s*/, $ENV{AMQ_TOPICS})) {
@ -99,7 +100,12 @@ while(1) {
$mqtt->retain($ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination, $incoming->body);
_debug("RABBIT <- " . $incoming->destination);
$rabbit->publish(1, "test", $incoming->body);
unless($rabbit_queues{$incoming_destination}) {
_debug("RABBIT Declaring queue $incoming_destination");
$rabbit->queue_declare(1, $incoming_destination);
$rabbit_queues{$incoming_destination}++;
}
$rabbit->publish(1, $incoming_destination, $incoming->body);
$amq->ack({ frame => $incoming });