republish events

This commit is contained in:
Matthew Slowe 2021-01-24 13:52:46 +00:00
parent 84d352ad4d
commit d1d0157bc5

View File

@ -7,6 +7,9 @@ use Net::MQTT::Simple;
use JSON; use JSON;
use Data::Dumper; use Data::Dumper;
$| = 1 if $ENV{DEBUG};
my $mqtt;
sub assert($;$) { sub assert($;$) {
my ($required, $message) = @_; my ($required, $message) = @_;
unless ($required) { unless ($required) {
@ -36,17 +39,67 @@ sub handle($$) {
}; };
if ($data) { if ($data) {
_debug( Dumper($data) ); # _debug( "[$topic] " . summary($data) );
foreach (@{$data}) {
process_message($_);
}
} else { } else {
_error("Got invalid JSON data from $topic: $json"); _error("Got invalid JSON data from $topic: $json");
} }
} }
sub summary($) {
my $json = shift;
my $str = sprintf("Data blob containing %d messages", scalar @{$json});
return $str;
}
sub process_message($) {
# CA => "Step",
# CB => "Cancel",
# CC => "Interpose",
# CT => "Heartbeat",
my $msg = shift;
my $type = (keys %$msg)[0];
my %inner = %{$msg->{$type}};
if($type eq "CA_MSG") {
clear_berth($inner{area_id}, $inner{from});
set_berth($inner{area_id}, $inner{to}, $inner{descr});
# "STEP: $inner{area_id} $inner{to} := $inner{descr} at $inner{time}\n";
} elsif($type eq 'CB_MSG') {
clear_berth($inner{area_id}, $inner{from});
# "CLEAR: $inner{area_id} $inner{from} at $inner{time}\n";
} elsif($type eq 'CC_MSG') {
set_berth($inner{area_id}, $inner{to}, $inner{descr});
# "INTERPOSE: $inner{area_id} $inner{to} := $inner{descr} at $inner{time}\n";
} else {
# print "SKIPPING MESSAGE\n";
}
}
sub set_berth($$$) {
my ($area, $berth, $occupant) = @_;
publish("berths/$area/$berth", $occupant);
}
sub clear_berth($$) {
my ($area, $berth) = @_;
publish(sprintf("berths/%s/%s", $area||"-", $berth || "-"), "-");
}
sub publish($$) {
my ($topic, $message) = @_;
# _debug("$ENV{MQTT_PUBLISH_BASE}/$topic <-- $message");
$mqtt->retain("$ENV{MQTT_PUBLISH_BASE}/$topic", $message);
}
### Check required environement variables are present ### Check required environement variables are present
foreach ( foreach (
qw( qw(
MQTT_SERVER MQTT_SERVER
MQTT_MONITOR MQTT_MONITOR
MQTT_PUBLISH_BASE
) )
) { ) {
assert($ENV{$_}, "`$_' not available"); assert($ENV{$_}, "`$_' not available");
@ -54,7 +107,7 @@ foreach (
## Connect to MQTT ## Connect to MQTT
_debug("Connecting to mqtt://$ENV{MQTT_SERVER}/$ENV{MQTT_MONITOR}"); _debug("Connecting to mqtt://$ENV{MQTT_SERVER}/$ENV{MQTT_MONITOR}");
my $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}"; $mqtt = Net::MQTT::Simple->new($ENV{MQTT_SERVER}) or die "Failed to connect to mqtt://$ENV{MQTT_SERVER}";
$mqtt->subscribe( $mqtt->subscribe(
"$ENV{MQTT_MONITOR}" => sub { "$ENV{MQTT_MONITOR}" => sub {