Working activemq -> mqtt proxy
This commit is contained in:
		
							
								
								
									
										84
									
								
								bridge.pl
									
									
									
									
									
										Executable file
									
								
							
							
						
						
									
										84
									
								
								bridge.pl
									
									
									
									
									
										Executable file
									
								
							| @@ -0,0 +1,84 @@ | ||||
| #!/usr/bin/perl | ||||
|  | ||||
| # use warnings; | ||||
| # use strict; | ||||
|  | ||||
| use Net::Stomp; | ||||
| use Net::MQTT::Simple; | ||||
| use AnyEvent::RabbitMQ; | ||||
|  | ||||
| use Data::Dumper; | ||||
|  | ||||
| sub assert($;$) { | ||||
|     my ($required, $message) = @_; | ||||
|     unless ($required) { | ||||
|         my $msg = "Assertion failed"; | ||||
|         $msg .= " ($message)" if $message; | ||||
|         die $msg; | ||||
|     } | ||||
| } | ||||
|  | ||||
| sub _debug($) { | ||||
|     my $msg = shift; | ||||
|     printf("%s [%s] %s\n", time, "DEBUG", $msg); | ||||
| } | ||||
|  | ||||
| ### Check required environement variables are present | ||||
| foreach ( | ||||
|     qw( | ||||
|         AMQ_SERVER | ||||
|         AMQ_PORT | ||||
|         AMQ_USER | ||||
|         AMQ_PASSWORD | ||||
|         AMQ_TOPICS | ||||
|  | ||||
|         MQTT_SERVER | ||||
|         MQTT_PUBLISH_BASE | ||||
|     ) | ||||
| ) { | ||||
|     assert($ENV{$_}, "`$_' not available"); | ||||
| } | ||||
|  | ||||
|  | ||||
| ## Connect to ActiveMQ | ||||
| my $amq = Net::Stomp->new( { hostname => $ENV{AMQ_SERVER}, port => $ENV{AMQ_PORT} } ) or die "Failed to connect to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}"; | ||||
| $amq->connect( { | ||||
| 		login => $ENV{AMQ_USER}, | ||||
| 		passcode => $ENV{AMQ_PASSWORD}, | ||||
| 		# "client-id" =>  $conf->{feed}->{"client-id"} | ||||
| 	} ) or die "Failed to authenticate to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}"; | ||||
|  | ||||
|  | ||||
| ## Connect to MQTT | ||||
| my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}"; | ||||
|  | ||||
| ## Subscribe to ActiveMQ topics | ||||
| foreach my $topic (split (/\s*,\s*/, $ENV{AMQ_TOPICS})) { | ||||
|     _debug("Subscribing to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic"); | ||||
| 	$amq->subscribe( { | ||||
| 		destination => "/topic/$topic", | ||||
|         ack         => 'client', | ||||
|  | ||||
| 	} ) or warn "Unable to subscribe to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic"; | ||||
| } | ||||
|  | ||||
|  | ||||
| while(1) { | ||||
|     my $incoming = $amq->receive_frame; | ||||
|     if (!defined $incoming) { | ||||
|         _debug("Connection issues?"); | ||||
|         next; | ||||
|     } | ||||
|  | ||||
|     my $incoming_destination = $incoming->destination; | ||||
|     $incoming_destination =~ s~^/[^/]*/~~; | ||||
|  | ||||
|  | ||||
|     _debug("AMQ  -> " . $incoming->destination); | ||||
|     _debug("MQTT <- " . $ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination); | ||||
|  | ||||
|     $mqtt->retain($ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination, $incoming->body); | ||||
|  | ||||
|     $amq->ack({ frame => $incoming }); | ||||
|  | ||||
| } | ||||
		Reference in New Issue
	
	Block a user
	 Matthew Slowe
					Matthew Slowe