Compare commits

..

2 Commits

Author SHA1 Message Date
12e6122706 Fix indentation 2021-01-24 14:37:46 +00:00
0fcf6745f1 allow disable rabbit 2021-01-24 14:32:31 +00:00

View File

@ -50,37 +50,40 @@ foreach (
## Connect to ActiveMQ ## Connect to ActiveMQ
my $amq = Net::Stomp->new( { hostname => $ENV{AMQ_SERVER}, port => $ENV{AMQ_PORT} } ) or die "Failed to connect to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}"; my $amq = Net::Stomp->new( { hostname => $ENV{AMQ_SERVER}, port => $ENV{AMQ_PORT} } ) or die "Failed to connect to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}";
$amq->connect( { $amq->connect( {
login => $ENV{AMQ_USER}, login => $ENV{AMQ_USER},
passcode => $ENV{AMQ_PASSWORD}, passcode => $ENV{AMQ_PASSWORD},
# "client-id" => $conf->{feed}->{"client-id"} # "client-id" => $conf->{feed}->{"client-id"}
} ) or die "Failed to authenticate to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}"; } ) or die "Failed to authenticate to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}";
## Connect to MQTT ## Connect to MQTT
_debug("Connecting to mqtt://$ENV{MQTT_SERVER}"); _debug("Connecting to mqtt://$ENV{MQTT_SERVER}");
my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}"; my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}";
my $rabbit = Net::RabbitMQ->new(); my $rabbit;
_debug("Connecting to rabbitmq://$ENV{RABBITMQ_SERVER}"); my %rabbit_queues;
$rabbit->connect($ENV{RABBITMQ_SERVER}, unless ($ENV{DISABLE_RABBIT}) {
{ $rabbit = Net::RabbitMQ->new();
_debug("Connecting to rabbitmq://$ENV{RABBITMQ_SERVER}");
$rabbit->connect($ENV{RABBITMQ_SERVER},
{
user => $ENV{RABBITMQ_USER}, user => $ENV{RABBITMQ_USER},
password => $ENV{RABBITMQ_PASSWORD}, password => $ENV{RABBITMQ_PASSWORD},
vhost => $ENV{RABBITMQ_VHOST}, vhost => $ENV{RABBITMQ_VHOST},
channel_max => 64, channel_max => 64,
}); });
_debug("Open rabbitmq channel"); _debug("Open rabbitmq channel");
$rabbit->channel_open(1); $rabbit->channel_open(1);
my %rabbit_queues; }
## Subscribe to ActiveMQ topics ## Subscribe to ActiveMQ topics
foreach my $topic (split (/\s*,\s*/, $ENV{AMQ_TOPICS})) { foreach my $topic (split (/\s*,\s*/, $ENV{AMQ_TOPICS})) {
_debug("Subscribing to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic"); _debug("Subscribing to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic");
$amq->subscribe( { $amq->subscribe( {
destination => "/topic/$topic", destination => "/topic/$topic",
ack => 'client', ack => 'client',
} ) or warn "Unable to subscribe to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic"; } ) or warn "Unable to subscribe to amq://$ENV{AMQ_SERVER}:$ENV{AMQ_PORT}/topic/$topic";
} }
@ -99,15 +102,16 @@ while(1) {
_debug("MQTT <- " . $ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination); _debug("MQTT <- " . $ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination);
$mqtt->retain($ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination, $incoming->body); $mqtt->retain($ENV{MQTT_PUBLISH_BASE} . "/" . $incoming_destination, $incoming->body);
_debug("RABBIT <- " . $incoming->destination); unless($ENV{DISABLE_RABBIT}) {
unless($rabbit_queues{$incoming_destination}) { _debug("RABBIT <- " . $incoming->destination);
unless($rabbit_queues{$incoming_destination}) {
_debug("RABBIT Declaring queue $incoming_destination"); _debug("RABBIT Declaring queue $incoming_destination");
$rabbit->queue_declare(1, $incoming_destination, { durable => 1 }); $rabbit->queue_declare(1, $incoming_destination, { durable => 1 });
$rabbit_queues{$incoming_destination}++; $rabbit_queues{$incoming_destination}++;
}
$rabbit->publish(1, $incoming_destination, $incoming->body);
} }
$rabbit->publish(1, $incoming_destination, $incoming->body); # $amq->ack({ frame => $incoming });
$amq->ack({ frame => $incoming });
} }