diff --git a/bridge.pl b/bridge.pl index 36a06ad..3092110 100755 --- a/bridge.pl +++ b/bridge.pl @@ -50,10 +50,10 @@ foreach ( ## Connect to ActiveMQ my $amq = Net::Stomp->new( { hostname => $ENV{AMQ_SERVER}, port => $ENV{AMQ_PORT} } ) or die "Failed to connect to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}"; $amq->connect( { - login => $ENV{AMQ_USER}, - passcode => $ENV{AMQ_PASSWORD}, - # "client-id" => $conf->{feed}->{"client-id"} - } ) or die "Failed to authenticate to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}"; + login => $ENV{AMQ_USER}, + passcode => $ENV{AMQ_PASSWORD}, + # "client-id" => $conf->{feed}->{"client-id"} + } ) or die "Failed to authenticate to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}"; ## Connect to MQTT @@ -63,27 +63,27 @@ my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect t my $rabbit; my %rabbit_queues; unless ($ENV{DISABLE_RABBIT}) { - $rabbit = Net::RabbitMQ->new(); - _debug("Connecting to rabbitmq://$ENV{RABBITMQ_SERVER}"); - $rabbit->connect($ENV{RABBITMQ_SERVER}, - { - user => $ENV{RABBITMQ_USER}, - password => $ENV{RABBITMQ_PASSWORD}, - vhost => $ENV{RABBITMQ_VHOST}, - channel_max => 64, - }); - _debug("Open rabbitmq channel"); - $rabbit->channel_open(1); + $rabbit = Net::RabbitMQ->new(); + _debug("Connecting to rabbitmq://$ENV{RABBITMQ_SERVER}"); + $rabbit->connect($ENV{RABBITMQ_SERVER}, + { + user => $ENV{RABBITMQ_USER}, + password => $ENV{RABBITMQ_PASSWORD}, + vhost => $ENV{RABBITMQ_VHOST}, + channel_max => 64, + }); + _debug("Open rabbitmq channel"); + $rabbit->channel_open(1); } ## Subscribe to ActiveMQ topics foreach my $topic (split (/\s*,\s*/, $ENV{AMQ_TOPICS})) { _debug("Subscribing to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic"); - $amq->subscribe( { - destination => "/topic/$topic", + $amq->subscribe( { + destination => "/topic/$topic", ack => 'client', - } ) or warn "Unable to subscribe to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic"; + } ) or warn "Unable to subscribe to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic"; } @@ -102,15 +102,15 @@ while(1) { _debug("MQTT <- " . $ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination); $mqtt->retain($ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination, $incoming->body); - unless($ENV{DISABLE_RABBIT}) { - _debug("RABBIT <- " . $incoming->destination); - unless($rabbit_queues{$incoming_destination}) { - _debug("RABBIT Declaring queue $incoming_destination"); - $rabbit->queue_declare(1, $incoming_destination, { durable => 1 }); - $rabbit_queues{$incoming_destination}++; - } - $rabbit->publish(1, $incoming_destination, $incoming->body); - } + unless($ENV{DISABLE_RABBIT}) { + _debug("RABBIT <- " . $incoming->destination); + unless($rabbit_queues{$incoming_destination}) { + _debug("RABBIT Declaring queue $incoming_destination"); + $rabbit->queue_declare(1, $incoming_destination, { durable => 1 }); + $rabbit_queues{$incoming_destination}++; + } + $rabbit->publish(1, $incoming_destination, $incoming->body); + } # $amq->ack({ frame => $incoming }); }