Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/test-and-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ jobs:
distribution: 'temurin'
java-version: '11'
cache: 'maven'
- name: Run Spotless Apply
run: mvn spotless:apply
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Build with Maven
run: mvn --file pom.xml clean install
env:
Expand Down
18 changes: 18 additions & 0 deletions eclipse-java-formatter.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<profiles version="13">
<profile kind="CodeFormatterProfile" name="Custom-4spaces-NoCommentSplit" version="13">

<setting id="org.eclipse.jdt.core.formatter.tabulation.char" value="space"/>
<setting id="org.eclipse.jdt.core.formatter.tabulation.size" value="4"/>
<setting id="org.eclipse.jdt.core.formatter.indentation.size" value="4"/>
<setting id="org.eclipse.jdt.core.formatter.use_tabs_only_for_leading_indentations" value="false"/>

<setting id="org.eclipse.jdt.core.formatter.lineSplit" value="120"/>

<setting id="org.eclipse.jdt.core.formatter.comment.line_length" value="9999"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_line_comments" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_block_comments" value="false"/>
<setting id="org.eclipse.jdt.core.formatter.comment.format_javadoc_comments" value="false"/>

</profile>
</profiles>
30 changes: 29 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<common.version>2.0.3-RC6</common.version>
<common.version>2.0.3</common.version>
<spotlessMavenPlugin.version>2.43.0</spotlessMavenPlugin.version>
<googleJavaFormat.version>1.17.0</googleJavaFormat.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -86,6 +88,32 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0</version>
</plugin>
<plugin>
<groupId>com.diffplug.spotless</groupId>
<artifactId>spotless-maven-plugin</artifactId>
<version>${spotlessMavenPlugin.version}</version>
<configuration>
<java>
<eclipse>
<file>${project.basedir}/eclipse-java-formatter.xml</file>
</eclipse>
</java>
</configuration>
<executions>
<execution>
<id>spotless-check</id>
<goals>
<goal>check</goal>
</goals>
</execution>
<execution>
<id>spotless-apply</id>
<goals>
<goal>apply</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

</build>
Expand Down
194 changes: 101 additions & 93 deletions src/main/java/fi/hsl/transitdata/alert/AlertHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ public AlertHandler(final PulsarApplicationContext context) {

public void handleMessage(final Message message) {
try {
if (!TransitdataSchema.hasProtobufSchema(message, TransitdataProperties.ProtobufSchema.TransitdataServiceAlert)) {
if (!TransitdataSchema.hasProtobufSchema(message,
TransitdataProperties.ProtobufSchema.TransitdataServiceAlert)) {
throw new Exception("Invalid protobuf schema");
}
InternalMessages.ServiceAlert alert = InternalMessages.ServiceAlert.parseFrom(message.getData());

final long timestampMs = message.getEventTime();
final long timestampSecs = timestampMs / 1000;

List<GtfsRealtime.FeedEntity> entities = createFeedEntities(alert.getBulletinsList(), globalNoServiceAlerts);
List<GtfsRealtime.FeedEntity> entities = createFeedEntities(alert.getBulletinsList(),
globalNoServiceAlerts);
GtfsRealtime.FeedMessage feedMessage = FeedMessageFactory.createFullFeedMessage(entities, timestampSecs);

sendPulsarMessage(feedMessage, timestampMs);
Expand All @@ -54,15 +56,15 @@ public void handleMessage(final Message message) {
}

private void ack(MessageId received) {
consumer.acknowledgeAsync(received)
.exceptionally(throwable -> {
log.error("Failed to ack Pulsar message", throwable);
return null;
})
.thenRun(() -> {});
consumer.acknowledgeAsync(received).exceptionally(throwable -> {
log.error("Failed to ack Pulsar message", throwable);
return null;
}).thenRun(() -> {
});
}

static List<GtfsRealtime.FeedEntity> createFeedEntities(final List<InternalMessages.Bulletin> bulletins, final boolean globalNoServiceAlerts) {
static List<GtfsRealtime.FeedEntity> createFeedEntities(final List<InternalMessages.Bulletin> bulletins,
final boolean globalNoServiceAlerts) {
return bulletins.stream().map(bulletin -> {
final Optional<GtfsRealtime.Alert> maybeAlert = createAlert(bulletin, globalNoServiceAlerts);
return maybeAlert.map(alert -> {
Expand All @@ -71,29 +73,27 @@ static List<GtfsRealtime.FeedEntity> createFeedEntities(final List<InternalMessa
builder.setAlert(alert);
return builder.build();
});
}).filter(Optional::isPresent)
.map(Optional::get)
.collect(Collectors.toList());
}).filter(Optional::isPresent).map(Optional::get).collect(Collectors.toList());
}

private static boolean bulletinAffectsAll(InternalMessages.Bulletin bulletin) {
return bulletin.getAffectsAllRoutes() || bulletin.getAffectsAllStops();
}

static Optional<GtfsRealtime.Alert> createAlert(final InternalMessages.Bulletin bulletin, final boolean globalNoServiceAlerts) {
static Optional<GtfsRealtime.Alert> createAlert(final InternalMessages.Bulletin bulletin,
final boolean globalNoServiceAlerts) {
Optional<GtfsRealtime.Alert> maybeAlert;
try {
if (bulletin.hasDisplayOnly() && bulletin.getDisplayOnly()) {
log.debug("No alert created for bulletin {} that is meant to be published only on vehicle displays", bulletin.getBulletinId());
log.debug("No alert created for bulletin {} that is meant to be published only on vehicle displays",
bulletin.getBulletinId());
return Optional.empty();
}

final long startInUtcSecs = bulletin.getValidFromUtcMs() / 1000;
final long stopInUtcSecs = bulletin.getValidToUtcMs() / 1000;
final GtfsRealtime.TimeRange timeRange = GtfsRealtime.TimeRange.newBuilder()
.setStart(startInUtcSecs)
.setEnd(stopInUtcSecs)
.build();
final GtfsRealtime.TimeRange timeRange = GtfsRealtime.TimeRange.newBuilder().setStart(startInUtcSecs)
.setEnd(stopInUtcSecs).build();

final GtfsRealtime.Alert.Builder builder = GtfsRealtime.Alert.newBuilder();
builder.addActivePeriod(timeRange);
Expand All @@ -108,15 +108,16 @@ static Optional<GtfsRealtime.Alert> createAlert(final InternalMessages.Bulletin
if (bulletin.getUrlsCount() > 0) {
builder.setUrl(toGtfsTranslatedString(bulletin.getUrlsList()));
}
final Optional<GtfsRealtime.Alert.SeverityLevel> maybeSeverityLevel = toGtfsSeverityLevel(bulletin.getPriority());
final Optional<GtfsRealtime.Alert.SeverityLevel> maybeSeverityLevel = toGtfsSeverityLevel(
bulletin.getPriority());
maybeSeverityLevel.ifPresent(builder::setSeverityLevel);

Collection<GtfsRealtime.EntitySelector> entitySelectors = entitySelectorsForBulletin(bulletin);
if (entitySelectors.isEmpty()) {
log.error("Failed to find any Informed Entities for bulletin Id {}. Discarding alert.", bulletin.getBulletinId());
log.error("Failed to find any Informed Entities for bulletin Id {}. Discarding alert.",
bulletin.getBulletinId());
maybeAlert = Optional.empty();
}
else {
} else {
builder.addAllInformedEntity(entitySelectors);
maybeAlert = Optional.of(builder.build());
}
Expand All @@ -126,21 +127,24 @@ static Optional<GtfsRealtime.Alert> createAlert(final InternalMessages.Bulletin
}

maybeAlert.ifPresent(alert -> {
final Optional<String> titleEn = alert.getHeaderText().getTranslationList().stream().filter(translation -> "en".equals(translation.getLanguage())).findAny().map(GtfsRealtime.TranslatedString.Translation::getText);
log.info("Created an alert with title {} for bulletin {}", titleEn.orElse("null"), bulletin.getBulletinId());
final Optional<String> titleEn = alert.getHeaderText().getTranslationList().stream()
.filter(translation -> "en".equals(translation.getLanguage())).findAny()
.map(GtfsRealtime.TranslatedString.Translation::getText);
log.info("Created an alert with title {} for bulletin {}", titleEn.orElse("null"),
bulletin.getBulletinId());
});

return maybeAlert;
}

static Collection<GtfsRealtime.EntitySelector> entitySelectorsForBulletin(final InternalMessages.Bulletin bulletin) {
static Collection<GtfsRealtime.EntitySelector> entitySelectorsForBulletin(
final InternalMessages.Bulletin bulletin) {
Set<GtfsRealtime.EntitySelector> selectors = new HashSet<>();
if (bulletinAffectsAll(bulletin)) {
log.debug("Bulletin {} affects all routes or stops", bulletin.getBulletinId());

GtfsRealtime.EntitySelector agency = GtfsRealtime.EntitySelector.newBuilder()
.setAgencyId(AGENCY_ENTITY_SELECTOR)
.build();
.setAgencyId(AGENCY_ENTITY_SELECTOR).build();
selectors.add(agency);
}
if (bulletin.getAffectedRoutesCount() > 0) {
Expand All @@ -162,72 +166,72 @@ static Collection<GtfsRealtime.EntitySelector> entitySelectorsForBulletin(final
return selectors;
}

private void sendPulsarMessage(final GtfsRealtime.FeedMessage feedMessage, long timestampMs) throws PulsarClientException {
private void sendPulsarMessage(final GtfsRealtime.FeedMessage feedMessage, long timestampMs)
throws PulsarClientException {
try {
producer.newMessage().value(feedMessage.toByteArray())
.eventTime(timestampMs)
.property(TransitdataProperties.KEY_PROTOBUF_SCHEMA, TransitdataProperties.ProtobufSchema.GTFS_ServiceAlert.toString())
producer.newMessage().value(feedMessage.toByteArray()).eventTime(timestampMs)
.property(TransitdataProperties.KEY_PROTOBUF_SCHEMA,
TransitdataProperties.ProtobufSchema.GTFS_ServiceAlert.toString())
.send();
log.info("Produced a new GTFS-RT service alert message with timestamp {}", timestampMs);
}
catch (PulsarClientException e) {
} catch (PulsarClientException e) {
log.error("Failed to send message to Pulsar", e);
throw e;
}
catch (Exception e) {
} catch (Exception e) {
log.error("Failed to handle alert message", e);
}
}

public static GtfsRealtime.Alert.Cause toGtfsCause(final InternalMessages.Category category) {
switch (category) {
case OTHER_DRIVER_ERROR:
case TOO_MANY_PASSENGERS:
case MISPARKED_VEHICLE:
case TEST:
case STATE_VISIT:
case TRACK_BLOCKED:
case EARLIER_DISRUPTION:
case OTHER:
case NO_TRAFFIC_DISRUPTION:
case TRAFFIC_JAM:
case PUBLIC_EVENT:
case STAFF_DEFICIT:
case DISTURBANCE:
case OTHER_DRIVER_ERROR :
case TOO_MANY_PASSENGERS :
case MISPARKED_VEHICLE :
case TEST :
case STATE_VISIT :
case TRACK_BLOCKED :
case EARLIER_DISRUPTION :
case OTHER :
case NO_TRAFFIC_DISRUPTION :
case TRAFFIC_JAM :
case PUBLIC_EVENT :
case STAFF_DEFICIT :
case DISTURBANCE :
return GtfsRealtime.Alert.Cause.OTHER_CAUSE;
case ITS_SYSTEM_ERROR:
case SWITCH_FAILURE:
case TECHNICAL_FAILURE:
case VEHICLE_BREAKDOWN:
case POWER_FAILURE:
case VEHICLE_DEFICIT:
case ITS_SYSTEM_ERROR :
case SWITCH_FAILURE :
case TECHNICAL_FAILURE :
case VEHICLE_BREAKDOWN :
case POWER_FAILURE :
case VEHICLE_DEFICIT :
return GtfsRealtime.Alert.Cause.TECHNICAL_PROBLEM;
case STRIKE:
case STRIKE :
return GtfsRealtime.Alert.Cause.STRIKE;
case VEHICLE_OFF_THE_ROAD:
case TRAFFIC_ACCIDENT:
case ACCIDENT:
case VEHICLE_OFF_THE_ROAD :
case TRAFFIC_ACCIDENT :
case ACCIDENT :
return GtfsRealtime.Alert.Cause.ACCIDENT;
case SEIZURE:
case MEDICAL_INCIDENT:
case SEIZURE :
case MEDICAL_INCIDENT :
return GtfsRealtime.Alert.Cause.MEDICAL_EMERGENCY;
case WEATHER:
case WEATHER_CONDITIONS:
case WEATHER :
case WEATHER_CONDITIONS :
return GtfsRealtime.Alert.Cause.WEATHER;
case ROAD_MAINTENANCE:
case TRACK_MAINTENANCE:
case ROAD_MAINTENANCE :
case TRACK_MAINTENANCE :
return GtfsRealtime.Alert.Cause.MAINTENANCE;
case ROAD_CLOSED:
case ROAD_TRENCH:
case ROAD_CLOSED :
case ROAD_TRENCH :
return GtfsRealtime.Alert.Cause.CONSTRUCTION;
case ASSAULT:
case ASSAULT :
return GtfsRealtime.Alert.Cause.POLICE_ACTIVITY;
default:
default :
return GtfsRealtime.Alert.Cause.UNKNOWN_CAUSE;
}
}

public static GtfsRealtime.Alert.Effect getGtfsEffect(final InternalMessages.Bulletin bulletin, final boolean globalNoServiceAlerts) {
public static GtfsRealtime.Alert.Effect getGtfsEffect(final InternalMessages.Bulletin bulletin,
final boolean globalNoServiceAlerts) {
final boolean affectsAll = bulletinAffectsAll(bulletin);
final InternalMessages.Bulletin.Impact impact = bulletin.getImpact();

Expand All @@ -242,46 +246,50 @@ public static GtfsRealtime.Alert.Effect getGtfsEffect(final InternalMessages.Bul

public static GtfsRealtime.Alert.Effect toGtfsEffect(final InternalMessages.Bulletin.Impact impact) {
switch (impact) {
case CANCELLED:
case CANCELLED :
return GtfsRealtime.Alert.Effect.NO_SERVICE;
case DELAYED:
case IRREGULAR_DEPARTURES:
case DELAYED :
case IRREGULAR_DEPARTURES :
return GtfsRealtime.Alert.Effect.SIGNIFICANT_DELAYS;
case DEVIATING_SCHEDULE:
case POSSIBLE_DEVIATIONS:
case DEVIATING_SCHEDULE :
case POSSIBLE_DEVIATIONS :
return GtfsRealtime.Alert.Effect.MODIFIED_SERVICE;
case DISRUPTION_ROUTE:
case DISRUPTION_ROUTE :
return GtfsRealtime.Alert.Effect.DETOUR;
case POSSIBLY_DELAYED:
case VENDING_MACHINE_OUT_OF_ORDER:
case RETURNING_TO_NORMAL:
case OTHER:
case POSSIBLY_DELAYED :
case VENDING_MACHINE_OUT_OF_ORDER :
case RETURNING_TO_NORMAL :
case OTHER :
return GtfsRealtime.Alert.Effect.OTHER_EFFECT;
case REDUCED_TRANSPORT:
case REDUCED_TRANSPORT :
return GtfsRealtime.Alert.Effect.REDUCED_SERVICE;
case NO_TRAFFIC_IMPACT:
case NO_TRAFFIC_IMPACT :
return GtfsRealtime.Alert.Effect.NO_EFFECT;
default:
default :
return GtfsRealtime.Alert.Effect.UNKNOWN_EFFECT;
}
}

public static Optional<GtfsRealtime.Alert.SeverityLevel> toGtfsSeverityLevel(final InternalMessages.Bulletin.Priority priority) {
public static Optional<GtfsRealtime.Alert.SeverityLevel> toGtfsSeverityLevel(
final InternalMessages.Bulletin.Priority priority) {
switch (priority) {
case INFO: return Optional.of(GtfsRealtime.Alert.SeverityLevel.INFO);
case WARNING: return Optional.of(GtfsRealtime.Alert.SeverityLevel.WARNING);
case SEVERE: return Optional.of(GtfsRealtime.Alert.SeverityLevel.SEVERE);
default: return Optional.empty();
case INFO :
return Optional.of(GtfsRealtime.Alert.SeverityLevel.INFO);
case WARNING :
return Optional.of(GtfsRealtime.Alert.SeverityLevel.WARNING);
case SEVERE :
return Optional.of(GtfsRealtime.Alert.SeverityLevel.SEVERE);
default :
return Optional.empty();
}
}

public static GtfsRealtime.TranslatedString toGtfsTranslatedString(final List<InternalMessages.Bulletin.Translation> translations) {
public static GtfsRealtime.TranslatedString toGtfsTranslatedString(
final List<InternalMessages.Bulletin.Translation> translations) {
GtfsRealtime.TranslatedString.Builder builder = GtfsRealtime.TranslatedString.newBuilder();
for (final InternalMessages.Bulletin.Translation translation: translations) {
GtfsRealtime.TranslatedString.Translation gtfsTranslation = GtfsRealtime.TranslatedString.Translation.newBuilder()
.setText(translation.getText())
.setLanguage(translation.getLanguage())
.build();
for (final InternalMessages.Bulletin.Translation translation : translations) {
GtfsRealtime.TranslatedString.Translation gtfsTranslation = GtfsRealtime.TranslatedString.Translation
.newBuilder().setText(translation.getText()).setLanguage(translation.getLanguage()).build();
builder.addTranslation(gtfsTranslation);
}
return builder.build();
Expand Down
Loading