diff --git a/bridge.pl b/bridge.pl index fb65c95..16afc3f 100755 --- a/bridge.pl +++ b/bridge.pl @@ -5,7 +5,7 @@ use Net::Stomp; use Net::MQTT::Simple; -use AnyEvent::RabbitMQ; +use Net::RabbitMQ; use Data::Dumper; @@ -20,6 +20,7 @@ sub assert($;$) { sub _debug($) { my $msg = shift; + return unless $ENV{DEBUG}; printf("%s [%s] %s\n", time, "DEBUG", $msg); } @@ -59,7 +60,6 @@ $amq->connect( { _debug("Connecting to mqtt://$ENV{MQTT_SERVER}"); my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}"; -use Net::RabbitMQ; my $rabbit = Net::RabbitMQ->new(); _debug("Connecting to rabbitmq://$ENV{RABBITMQ_SERVER}"); $rabbit->connect($ENV{RABBITMQ_SERVER}, @@ -71,6 +71,7 @@ $rabbit->connect($ENV{RABBITMQ_SERVER}, }); _debug("Open rabbitmq channel"); $rabbit->channel_open(1); +my %rabbit_queues; ## Subscribe to ActiveMQ topics foreach my $topic (split (/\s*,\s*/, $ENV{AMQ_TOPICS})) { @@ -99,7 +100,12 @@ while(1) { $mqtt->retain($ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination, $incoming->body); _debug("RABBIT <- " . $incoming->destination); - $rabbit->publish(1, "test", $incoming->body); + unless($rabbit_queues{$incoming_destination}) { + _debug("RABBIT Declaring queue $incoming_destination"); + $rabbit->queue_declare(1, $incoming_destination); + $rabbit_queues{$incoming_destination}++; + } + $rabbit->publish(1, $incoming_destination, $incoming->body); $amq->ack({ frame => $incoming });