#!/usr/bin/perl # use warnings; # use strict; use Net::Stomp; use Net::MQTT::Simple; use AnyEvent::RabbitMQ; use Data::Dumper; sub assert($;$) { my ($required, $message) = @_; unless ($required) { my $msg = "Assertion failed"; $msg .= " ($message)" if $message; die $msg; } } sub _debug($) { my $msg = shift; printf("%s [%s] %s\n", time, "DEBUG", $msg); } ### Check required environement variables are present foreach ( qw( AMQ_SERVER AMQ_PORT AMQ_USER AMQ_PASSWORD AMQ_TOPICS MQTT_SERVER MQTT_PUBLISH_BASE ) ) { assert($ENV{$_}, "`$_' not available"); } ## Connect to ActiveMQ my $amq = Net::Stomp->new( { hostname => $ENV{AMQ_SERVER}, port => $ENV{AMQ_PORT} } ) or die "Failed to connect to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}"; $amq->connect( { login => $ENV{AMQ_USER}, passcode => $ENV{AMQ_PASSWORD}, # "client-id" => $conf->{feed}->{"client-id"} } ) or die "Failed to authenticate to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}"; ## Connect to MQTT my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}"; ## Subscribe to ActiveMQ topics foreach my $topic (split (/\s*,\s*/, $ENV{AMQ_TOPICS})) { _debug("Subscribing to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic"); $amq->subscribe( { destination => "/topic/$topic", ack => 'client', } ) or warn "Unable to subscribe to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic"; } while(1) { my $incoming = $amq->receive_frame; if (!defined $incoming) { _debug("Connection issues?"); next; } my $incoming_destination = $incoming->destination; $incoming_destination =~ s~^/[^/]*/~~; _debug("AMQ -> " . $incoming->destination); _debug("MQTT <- " . $ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination); $mqtt->retain($ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination, $incoming->body); $amq->ack({ frame => $incoming }); }