#!/usr/bin/env perl use strict; use warnings; use Net::MQTT::Simple; use JSON; use Data::Dumper; $| = 1 if $ENV{DEBUG}; my $mqtt; sub assert($;$) { my ($required, $message) = @_; unless ($required) { my $msg = "Assertion failed"; $msg .= " ($message)" if $message; die $msg; } } sub _debug($) { my $msg = shift; return unless $ENV{DEBUG}; printf("%s [%s] %s\n", time, "DEBUG", $msg); } sub _error($) { my $msg = shift; printf("%s [%s] %s\n", time, "ERROR", $msg); } sub handle($$) { my ($topic, $json) = @_; my $data; eval { $data = from_json($json) }; if ($data) { _debug( "[$topic] " . summary($data) ); foreach (@{$data}) { process_message($_); } } else { _error("Got invalid JSON data from $topic: $json"); } } sub summary($) { my $json = shift; my $str = sprintf("Data blob containing %d messages", scalar @{$json}); return $str; } sub process_message($) { # CA => "Step", # CB => "Cancel", # CC => "Interpose", # CT => "Heartbeat", my $msg = shift; my $type = (keys %$msg)[0]; my %inner = %{$msg->{$type}}; if($type eq "CA_MSG") { clear_berth($inner{area_id}, $inner{from}); set_berth($inner{area_id}, $inner{to}, $inner{descr}); _debug( "STEP: $inner{area_id} $inner{to} := $inner{descr} at $inner{time}" ); } elsif($type eq 'CB_MSG') { clear_berth($inner{area_id}, $inner{from}); _debug( "CLEAR: $inner{area_id} $inner{from} at $inner{time}" ); } elsif($type eq 'CC_MSG') { set_berth($inner{area_id}, $inner{to}, $inner{descr}); _debug( "INTERPOSE: $inner{area_id} $inner{to} := $inner{descr} at $inner{time}" ); } else { # print "SKIPPING MESSAGE\n"; } } sub set_berth($$$) { my ($area, $berth, $occupant) = @_; publish("berths/$area/$berth", $occupant); } sub clear_berth($$) { my ($area, $berth) = @_; publish(sprintf("berths/%s/%s", $area||"-", $berth || "-"), undef); } sub publish($$) { my ($topic, $message) = @_; # _debug("$ENV{MQTT_PUBLISH_BASE}/$topic <-- $message"); $mqtt->retain("$ENV{MQTT_PUBLISH_BASE}/$topic", $message); } ### Check required environement variables are present foreach ( qw( MQTT_SERVER MQTT_MONITOR MQTT_PUBLISH_BASE ) ) { assert($ENV{$_}, "`$_' not available"); } ## Connect to MQTT _debug("Connecting to mqtt://$ENV{MQTT_SERVER}/$ENV{MQTT_MONITOR}"); $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}"; $mqtt->subscribe( "$ENV{MQTT_MONITOR}" => sub { my ($topic, $message) = @_; _debug("--> $topic"); handle($topic, $message); }, ); _debug("Ready to receive messages"); $mqtt->run();