Skip to content

Commit 66a4632

Browse files
committed
refactored flight generator
1 parent a51a5c8 commit 66a4632

File tree

6 files changed

+285
-94
lines changed

6 files changed

+285
-94
lines changed

common/models/build.gradle.kts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ plugins {
55
dependencies {
66
// Add model-specific dependencies here
77
implementation("org.apache.avro:avro:1.11.3")
8+
implementation("net.datafaker:datafaker:2.1.0")
9+
implementation("org.slf4j:slf4j-api:2.0.17")
810

911
// Test dependencies
1012
testImplementation(platform("org.junit:junit-bom:5.10.0"))
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
package io.confluent.developer.models.generator;
2+
3+
import io.confluent.developer.models.flight.Flight;
4+
import net.datafaker.Faker;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
8+
import java.time.ZoneId;
9+
import java.time.ZonedDateTime;
10+
import java.util.Arrays;
11+
import java.util.List;
12+
import java.util.Random;
13+
import java.util.concurrent.TimeUnit;
14+
15+
/**
16+
* Generator for creating random flight data.
17+
* This class encapsulates the logic for generating random flight information.
18+
*/
19+
public class FlightGenerator {
20+
private static final Logger LOG = LoggerFactory.getLogger(FlightGenerator.class);
21+
22+
private final Random random;
23+
private final Faker faker;
24+
25+
// Lists of airports and airlines for random selection
26+
private static final List<String> AIRPORTS = Arrays.asList(
27+
"ATL", "LAX", "ORD", "DFW", "DEN", "JFK", "SFO", "SEA", "LAS", "MCO",
28+
"EWR", "CLT", "PHX", "IAH", "MIA", "BOS", "MSP", "DTW", "FLL", "PHL"
29+
);
30+
31+
private static final List<String> AIRLINES = Arrays.asList(
32+
"Delta", "American", "United", "Southwest", "JetBlue",
33+
"Alaska", "Spirit", "Frontier", "Hawaiian", "Allegiant"
34+
);
35+
36+
private static final List<String> STATUSES = Arrays.asList(
37+
"ON_TIME", "DELAYED", "CANCELLED", "BOARDING", "IN_AIR", "LANDED", "DIVERTED"
38+
);
39+
40+
/**
41+
* Constructor with default seed.
42+
*/
43+
public FlightGenerator() {
44+
this(System.currentTimeMillis());
45+
}
46+
47+
/**
48+
* Constructor with specific seed for reproducible results.
49+
*
50+
* @param seed the random seed to use
51+
*/
52+
public FlightGenerator(long seed) {
53+
this.random = new Random(seed);
54+
this.faker = new Faker(random);
55+
LOG.info("FlightGenerator initialized with seed: {}", seed);
56+
}
57+
58+
/**
59+
* Generates a random flight record.
60+
*
61+
* @return A Flight object with random data
62+
*/
63+
public Flight generateFlight() {
64+
// Generate a random flight number
65+
String flightNumber = faker.aviation().flight();
66+
67+
// Select random origin and destination airports
68+
String origin = AIRPORTS.get(random.nextInt(AIRPORTS.size()));
69+
String destination;
70+
do {
71+
destination = AIRPORTS.get(random.nextInt(AIRPORTS.size()));
72+
} while (destination.equals(origin)); // Ensure origin and destination are different
73+
74+
// Select a random airline
75+
String airline = AIRLINES.get(random.nextInt(AIRLINES.size()));
76+
77+
// Generate departure times
78+
ZonedDateTime now = ZonedDateTime.now(ZoneId.of("UTC"));
79+
ZonedDateTime scheduledDeparture = now.plusHours(random.nextInt(48)); // Random time in next 48 hours
80+
81+
long scheduledDepartureMillis = scheduledDeparture.toInstant().toEpochMilli();
82+
Long actualDepartureMillis = null;
83+
84+
// Determine if the flight has departed
85+
boolean hasDeparted = random.nextDouble() < 0.7; // 70% chance the flight has departed
86+
87+
if (hasDeparted) {
88+
// For departed flights, actual departure could be on time, early, or delayed
89+
int minutesOffset = random.nextInt(60) - 15; // -15 to +45 minutes
90+
actualDepartureMillis = scheduledDepartureMillis + TimeUnit.MINUTES.toMillis(minutesOffset);
91+
}
92+
93+
// Determine flight status
94+
String status;
95+
if (!hasDeparted) {
96+
// Flight hasn't departed yet
97+
status = random.nextDouble() < 0.9 ? "SCHEDULED" : "CANCELLED";
98+
} else {
99+
// Flight has departed
100+
status = STATUSES.get(random.nextInt(STATUSES.size()));
101+
}
102+
103+
// Create and return the Flight object
104+
Flight.Builder builder = Flight.newBuilder()
105+
.setFlightNumber(flightNumber)
106+
.setAirline(airline)
107+
.setOrigin(origin)
108+
.setDestination(destination)
109+
.setScheduledDeparture(scheduledDepartureMillis)
110+
.setStatus(status);
111+
112+
if (actualDepartureMillis != null) {
113+
builder.setActualDeparture(actualDepartureMillis);
114+
}
115+
116+
Flight flight = builder.build();
117+
LOG.debug("Generated flight: {}", flight);
118+
return flight;
119+
}
120+
121+
/**
122+
* Gets a list of random airports used by this generator.
123+
*
124+
* @return List of airport codes
125+
*/
126+
public List<String> getAirports() {
127+
return AIRPORTS;
128+
}
129+
130+
/**
131+
* Gets a list of airlines used by this generator.
132+
*
133+
* @return List of airline names
134+
*/
135+
public List<String> getAirlines() {
136+
return AIRLINES;
137+
}
138+
139+
/**
140+
* Gets a list of possible flight statuses.
141+
*
142+
* @return List of possible flight statuses
143+
*/
144+
public List<String> getStatuses() {
145+
return STATUSES;
146+
}
147+
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package io.confluent.developer.models.generator;
2+
3+
import io.confluent.developer.models.flight.Flight;
4+
import org.junit.jupiter.api.Test;
5+
import org.junit.jupiter.params.ParameterizedTest;
6+
import org.junit.jupiter.params.provider.ValueSource;
7+
8+
import java.util.HashSet;
9+
import java.util.Set;
10+
11+
import static org.assertj.core.api.Assertions.assertThat;
12+
13+
class FlightGeneratorTest {
14+
15+
@Test
16+
void shouldGenerateRandomFlight() {
17+
// Given
18+
FlightGenerator generator = new FlightGenerator(42L); // fixed seed for reproducibility
19+
20+
// When
21+
Flight flight = generator.generateFlight();
22+
23+
// Then
24+
assertThat(flight).isNotNull();
25+
assertThat(flight.getFlightNumber()).isNotNull().isNotEmpty();
26+
assertThat(flight.getAirline()).isNotNull().isNotEmpty();
27+
assertThat(flight.getOrigin()).isNotNull().isNotEmpty();
28+
assertThat(flight.getDestination()).isNotNull().isNotEmpty();
29+
assertThat(flight.getStatus()).isNotNull().isNotEmpty();
30+
assertThat(flight.getScheduledDeparture()).isGreaterThan(0L);
31+
}
32+
33+
@Test
34+
void shouldGenerateUniqueFlights() {
35+
// Given
36+
FlightGenerator generator = new FlightGenerator();
37+
Set<String> flightNumbers = new HashSet<>();
38+
int flightCount = 100;
39+
40+
// When
41+
for (int i = 0; i < flightCount; i++) {
42+
Flight flight = generator.generateFlight();
43+
flightNumbers.add(flight.getFlightNumber());
44+
}
45+
46+
// Then - not all flight numbers should be unique due to random generation,
47+
// but we should have a good amount of diversity
48+
assertThat(flightNumbers.size()).isGreaterThan((int)(flightCount * 0.8));
49+
}
50+
51+
@Test
52+
void originAndDestinationShouldBeDifferent() {
53+
// Given
54+
FlightGenerator generator = new FlightGenerator();
55+
int flightCount = 100;
56+
57+
// When/Then
58+
for (int i = 0; i < flightCount; i++) {
59+
Flight flight = generator.generateFlight();
60+
assertThat(flight.getOrigin()).isNotEqualTo(flight.getDestination());
61+
}
62+
}
63+
64+
@Test
65+
void shouldProvideValidAirportsList() {
66+
// Given
67+
FlightGenerator generator = new FlightGenerator();
68+
69+
// When
70+
var airports = generator.getAirports();
71+
72+
// Then
73+
assertThat(airports).isNotNull().isNotEmpty();
74+
assertThat(airports).allMatch(airport -> airport.length() == 3);
75+
}
76+
77+
@Test
78+
void shouldProvideValidAirlinesList() {
79+
// Given
80+
FlightGenerator generator = new FlightGenerator();
81+
82+
// When
83+
var airlines = generator.getAirlines();
84+
85+
// Then
86+
assertThat(airlines).isNotNull().isNotEmpty();
87+
assertThat(airlines).allMatch(airline -> !airline.isEmpty());
88+
}
89+
90+
@Test
91+
void shouldProvideValidStatusesList() {
92+
// Given
93+
FlightGenerator generator = new FlightGenerator();
94+
95+
// When
96+
var statuses = generator.getStatuses();
97+
98+
// Then
99+
assertThat(statuses).isNotNull().isNotEmpty();
100+
assertThat(statuses).contains("DELAYED", "CANCELLED", "ON_TIME");
101+
}
102+
103+
@ParameterizedTest
104+
@ValueSource(longs = {1L, 42L, 100L, 999L})
105+
void shouldBeDeterministicWithSeed(long seed) {
106+
// Given
107+
FlightGenerator generator1 = new FlightGenerator(seed);
108+
FlightGenerator generator2 = new FlightGenerator(seed);
109+
110+
// When
111+
Flight flight1 = generator1.generateFlight();
112+
Flight flight2 = generator2.generateFlight();
113+
114+
// Then
115+
assertThat(flight1.getFlightNumber()).isEqualTo(flight2.getFlightNumber());
116+
assertThat(flight1.getAirline()).isEqualTo(flight2.getAirline());
117+
assertThat(flight1.getOrigin()).isEqualTo(flight2.getOrigin());
118+
assertThat(flight1.getDestination()).isEqualTo(flight2.getDestination());
119+
assertThat(flight1.getStatus()).isEqualTo(flight2.getStatus());
120+
121+
// Due to timing differences, we don't strictly compare timestamps
122+
// Time-based values may differ slightly due to execution timing
123+
124+
// If actual departure exists in one flight, it should exist in both
125+
if (flight1.getActualDeparture() != null) {
126+
assertThat(flight2.getActualDeparture()).isNotNull();
127+
}
128+
}
129+
}

flink-data-generator/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ dependencies {
2222
implementation("org.apache.flink:flink-connector-kafka:3.4.0-1.20")
2323
implementation("org.apache.flink:flink-avro:$flinkVersion")
2424
implementation("org.apache.flink:flink-connector-base:$flinkVersion")
25+
implementation("org.apache.flink:flink-core:$flinkVersion")
2526

2627
// Confluent
2728
implementation("io.confluent:kafka-schema-registry-client:$confluentVersion")

flink-data-generator/src/main/java/io/confluent/developer/generator/DataGeneratorJob.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,13 @@
33
import io.confluent.developer.models.flight.Flight;
44
import io.confluent.developer.serialization.FlightAvroSerializationSchema;
55
import io.confluent.developer.utils.ConfigUtils;
6-
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
76
import org.apache.flink.api.java.utils.ParameterTool;
87
import org.apache.flink.connector.kafka.sink.KafkaSink;
98
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
109
import org.apache.flink.streaming.api.datastream.DataStream;
1110
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
1211
import org.slf4j.Logger;
1312
import org.slf4j.LoggerFactory;
14-
1513
import java.util.Properties;
1614

1715
/**

0 commit comments

Comments
 (0)