network-rail-updater/update-berths.pl

122 lines
2.8 KiB
Perl
Raw Normal View History

2021-01-23 17:27:16 +00:00
#!/usr/bin/env perl
use strict;
use warnings;
use Net::MQTT::Simple;
use JSON;
use Data::Dumper;
2021-01-24 13:52:46 +00:00
$| = 1 if $ENV{DEBUG};
my $mqtt;
2021-01-23 17:27:16 +00:00
sub assert($;$) {
my ($required, $message) = @_;
unless ($required) {
my $msg = "Assertion failed";
$msg .= " ($message)" if $message;
die $msg;
}
}
sub _debug($) {
my $msg = shift;
return unless $ENV{DEBUG};
printf("%s [%s] %s\n", time, "DEBUG", $msg);
}
sub _error($) {
my $msg = shift;
printf("%s [%s] %s\n", time, "ERROR", $msg);
}
sub handle($$) {
my ($topic, $json) = @_;
my $data;
eval {
$data = from_json($json)
};
if ($data) {
2021-01-24 14:32:02 +00:00
_debug( "[$topic] " . summary($data) );
2021-01-24 13:52:46 +00:00
foreach (@{$data}) {
process_message($_);
}
2021-01-23 17:27:16 +00:00
} else {
_error("Got invalid JSON data from $topic: $json");
}
}
2021-01-24 13:52:46 +00:00
sub summary($) {
my $json = shift;
my $str = sprintf("Data blob containing %d messages", scalar @{$json});
return $str;
}
sub process_message($) {
# CA => "Step",
# CB => "Cancel",
# CC => "Interpose",
# CT => "Heartbeat",
my $msg = shift;
my $type = (keys %$msg)[0];
my %inner = %{$msg->{$type}};
if($type eq "CA_MSG") {
clear_berth($inner{area_id}, $inner{from});
set_berth($inner{area_id}, $inner{to}, $inner{descr});
2021-01-24 14:32:02 +00:00
_debug( "STEP: $inner{area_id} $inner{to} := $inner{descr} at $inner{time}" );
2021-01-24 13:52:46 +00:00
} elsif($type eq 'CB_MSG') {
clear_berth($inner{area_id}, $inner{from});
2021-01-24 14:32:02 +00:00
_debug( "CLEAR: $inner{area_id} $inner{from} at $inner{time}" );
2021-01-24 13:52:46 +00:00
} elsif($type eq 'CC_MSG') {
set_berth($inner{area_id}, $inner{to}, $inner{descr});
2021-01-24 14:32:02 +00:00
_debug( "INTERPOSE: $inner{area_id} $inner{to} := $inner{descr} at $inner{time}" );
2021-01-24 13:52:46 +00:00
} else {
# print "SKIPPING MESSAGE\n";
}
}
sub set_berth($$$) {
my ($area, $berth, $occupant) = @_;
publish("berths/$area/$berth", $occupant);
}
sub clear_berth($$) {
my ($area, $berth) = @_;
publish(sprintf("berths/%s/%s", $area||"-", $berth || "-"), "-");
}
sub publish($$) {
my ($topic, $message) = @_;
# _debug("$ENV{MQTT_PUBLISH_BASE}/$topic <-- $message");
$mqtt->retain("$ENV{MQTT_PUBLISH_BASE}/$topic", $message);
}
2021-01-23 17:27:16 +00:00
### Check required environement variables are present
foreach (
qw(
MQTT_SERVER
MQTT_MONITOR
2021-01-24 13:52:46 +00:00
MQTT_PUBLISH_BASE
2021-01-23 17:27:16 +00:00
)
) {
assert($ENV{$_}, "`$_' not available");
}
## Connect to MQTT
_debug("Connecting to mqtt://$ENV{MQTT_SERVER}/$ENV{MQTT_MONITOR}");
2021-01-24 13:52:46 +00:00
$mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}";
2021-01-23 17:27:16 +00:00
$mqtt->subscribe(
"$ENV{MQTT_MONITOR}" => sub {
my ($topic, $message) = @_;
2021-01-24 14:36:45 +00:00
_debug("--> $topic");
2021-01-23 17:27:16 +00:00
handle($topic, $message);
},
);
2021-01-24 14:32:02 +00:00
_debug("Ready to receive messages");
2021-01-23 17:27:16 +00:00
2021-01-24 14:32:02 +00:00
$mqtt->run();