diff --git a/update-berths.pl b/update-berths.pl index 931aec2..d35cc5b 100755 --- a/update-berths.pl +++ b/update-berths.pl @@ -7,6 +7,9 @@ use Net::MQTT::Simple; use JSON; use Data::Dumper; +$| = 1 if $ENV{DEBUG}; +my $mqtt; + sub assert($;$) { my ($required, $message) = @_; unless ($required) { @@ -36,17 +39,67 @@ sub handle($$) { }; if ($data) { - _debug( Dumper($data) ); + # _debug( "[$topic] " . summary($data) ); + foreach (@{$data}) { + process_message($_); + } } else { _error("Got invalid JSON data from $topic: $json"); } } +sub summary($) { + my $json = shift; + my $str = sprintf("Data blob containing %d messages", scalar @{$json}); + return $str; +} + +sub process_message($) { + # CA => "Step", + # CB => "Cancel", + # CC => "Interpose", + # CT => "Heartbeat", + my $msg = shift; + my $type = (keys %$msg)[0]; + my %inner = %{$msg->{$type}}; + if($type eq "CA_MSG") { + clear_berth($inner{area_id}, $inner{from}); + set_berth($inner{area_id}, $inner{to}, $inner{descr}); + + # "STEP: $inner{area_id} $inner{to} := $inner{descr} at $inner{time}\n"; + } elsif($type eq 'CB_MSG') { + clear_berth($inner{area_id}, $inner{from}); + # "CLEAR: $inner{area_id} $inner{from} at $inner{time}\n"; + } elsif($type eq 'CC_MSG') { + set_berth($inner{area_id}, $inner{to}, $inner{descr}); + # "INTERPOSE: $inner{area_id} $inner{to} := $inner{descr} at $inner{time}\n"; + } else { + # print "SKIPPING MESSAGE\n"; + } +} + +sub set_berth($$$) { + my ($area, $berth, $occupant) = @_; + publish("berths/$area/$berth", $occupant); +} + +sub clear_berth($$) { + my ($area, $berth) = @_; + publish(sprintf("berths/%s/%s", $area||"-", $berth || "-"), "-"); +} + +sub publish($$) { + my ($topic, $message) = @_; + # _debug("$ENV{MQTT_PUBLISH_BASE}/$topic <-- $message"); + $mqtt->retain("$ENV{MQTT_PUBLISH_BASE}/$topic", $message); +} + ### Check required environement variables are present foreach ( qw( MQTT_SERVER MQTT_MONITOR + MQTT_PUBLISH_BASE ) ) { assert($ENV{$_}, "`$_' not available"); @@ -54,7 +107,7 @@ foreach ( ## Connect to MQTT _debug("Connecting to mqtt://$ENV{MQTT_SERVER}/$ENV{MQTT_MONITOR}"); -my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}"; +$mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}"; $mqtt->subscribe( "$ENV{MQTT_MONITOR}" => sub {