#!/usr/bin/perl # use warnings; # use strict; use Net::Stomp; use Net::MQTT::Simple; use Net::RabbitMQ; use Data::Dumper; sub assert($;$) { my ($required, $message) = @_; unless ($required) { my $msg = "Assertion failed"; $msg .= " ($message)" if $message; die $msg; } } sub _debug($) { my $msg = shift; return unless $ENV{DEBUG}; printf("%s [%s] %s\n", time, "DEBUG", $msg); } ### Check required environement variables are present foreach ( qw( AMQ_SERVER AMQ_PORT AMQ_USER AMQ_PASSWORD AMQ_TOPICS MQTT_SERVER MQTT_PUBLISH_BASE RABBITMQ_SERVER RABBITMQ_PORT RABBITMQ_USER RABBITMQ_PASSWORD RABBITMQ_VHOST ) ) { assert($ENV{$_}, "`$_' not available"); } ## Connect to ActiveMQ my $amq = Net::Stomp->new( { hostname => $ENV{AMQ_SERVER}, port => $ENV{AMQ_PORT} } ) or die "Failed to connect to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}"; $amq->connect( { login => $ENV{AMQ_USER}, passcode => $ENV{AMQ_PASSWORD}, # "client-id" => $conf->{feed}->{"client-id"} } ) or die "Failed to authenticate to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}"; ## Connect to MQTT _debug("Connecting to mqtt://$ENV{MQTT_SERVER}"); my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}"; my $rabbit = Net::RabbitMQ->new(); _debug("Connecting to rabbitmq://$ENV{RABBITMQ_SERVER}"); $rabbit->connect($ENV{RABBITMQ_SERVER}, { user => $ENV{RABBITMQ_USER}, password => $ENV{RABBITMQ_PASSWORD}, vhost => $ENV{RABBITMQ_VHOST}, channel_max => 64, }); _debug("Open rabbitmq channel"); $rabbit->channel_open(1); my %rabbit_queues; ## Subscribe to ActiveMQ topics foreach my $topic (split (/\s*,\s*/, $ENV{AMQ_TOPICS})) { _debug("Subscribing to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic"); $amq->subscribe( { destination => "/topic/$topic", ack => 'client', } ) or warn "Unable to subscribe to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic"; } while(1) { my $incoming = $amq->receive_frame; if (!defined $incoming) { _debug("Connection issues?"); next; } my $incoming_destination = $incoming->destination; $incoming_destination =~ s~^/[^/]*/~~; _debug("AMQ -> " . $incoming->destination); _debug("MQTT <- " . $ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination); $mqtt->retain($ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination, $incoming->body); _debug("RABBIT <- " . $incoming->destination); unless($rabbit_queues{$incoming_destination}) { _debug("RABBIT Declaring queue $incoming_destination"); $rabbit->queue_declare(1, $incoming_destination, { durable => 1 }); $rabbit_queues{$incoming_destination}++; } $rabbit->publish(1, $incoming_destination, $incoming->body); $amq->ack({ frame => $incoming }); } $mq->disconnect();