From 3da2288e7b29a853a709d26864be3bb6ba720a75 Mon Sep 17 00:00:00 2001 From: Matthew Slowe Date: Sat, 9 Jan 2021 20:12:05 +0000 Subject: [PATCH] Working activemq -> mqtt proxy --- bridge.pl | 84 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 84 insertions(+) create mode 100755 bridge.pl diff --git a/bridge.pl b/bridge.pl new file mode 100755 index 0000000..0625018 --- /dev/null +++ b/bridge.pl @@ -0,0 +1,84 @@ +#!/usr/bin/perl + +# use warnings; +# use strict; + +use Net::Stomp; +use Net::MQTT::Simple; +use AnyEvent::RabbitMQ; + +use Data::Dumper; + +sub assert($;$) { + my ($required, $message) = @_; + unless ($required) { + my $msg = "Assertion failed"; + $msg .= " ($message)" if $message; + die $msg; + } +} + +sub _debug($) { + my $msg = shift; + printf("%s [%s] %s\n", time, "DEBUG", $msg); +} + +### Check required environement variables are present +foreach ( + qw( + AMQ_SERVER + AMQ_PORT + AMQ_USER + AMQ_PASSWORD + AMQ_TOPICS + + MQTT_SERVER + MQTT_PUBLISH_BASE + ) +) { + assert($ENV{$_}, "`$_' not available"); +} + + +## Connect to ActiveMQ +my $amq = Net::Stomp->new( { hostname => $ENV{AMQ_SERVER}, port => $ENV{AMQ_PORT} } ) or die "Failed to connect to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}"; +$amq->connect( { + login => $ENV{AMQ_USER}, + passcode => $ENV{AMQ_PASSWORD}, + # "client-id" => $conf->{feed}->{"client-id"} + } ) or die "Failed to authenticate to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}"; + + +## Connect to MQTT +my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}"; + +## Subscribe to ActiveMQ topics +foreach my $topic (split (/\s*,\s*/, $ENV{AMQ_TOPICS})) { + _debug("Subscribing to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic"); + $amq->subscribe( { + destination => "/topic/$topic", + ack => 'client', + + } ) or warn "Unable to subscribe to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic"; +} + + +while(1) { + my $incoming = $amq->receive_frame; + if (!defined $incoming) { + _debug("Connection issues?"); + next; + } + + my $incoming_destination = $incoming->destination; + $incoming_destination =~ s~^/[^/]*/~~; + + + _debug("AMQ -> " . $incoming->destination); + _debug("MQTT <- " . $ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination); + + $mqtt->retain($ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination, $incoming->body); + + $amq->ack({ frame => $incoming }); + +} \ No newline at end of file