Skip to content

Commit e4467c8

Browse files
authored
Use less ancient avro, maintain same wire format by @radai-rosenblatt (#359)
1 parent 77f9655 commit e4467c8

File tree

5 files changed

+23
-26
lines changed

5 files changed

+23
-26
lines changed

build.gradle

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,24 @@ allprojects {
2727

2828
repositories {
2929
mavenCentral()
30+
maven {
31+
url "https://linkedin.jfrog.io/artifactory/avro-util/"
32+
}
3033
}
3134

3235
dependencies {
3336
compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
3437
compile 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.1'
35-
compile 'org.apache.avro:avro:1.4.1'
38+
compile 'org.apache.avro:avro:1.9.2'
3639
compile 'org.json:json:20140107'
3740
compile 'org.jolokia:jolokia-jvm:1.6.2'
3841
compile 'net.savantly:graphite-client:1.1.0-RELEASE'
3942
compile 'com.timgroup:java-statsd-client:3.0.1'
4043
compile 'com.signalfx.public:signalfx-codahale:0.0.47'
4144
compile group: 'org.apache.kafka', name: 'kafka_2.12', version: '2.4.0'
4245
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1'
46+
compile 'org.apache.commons:commons-lang3:3.12.0'
47+
compile 'com.linkedin.avroutil1:helper-all:0.2.81'
4348
testCompile 'org.mockito:mockito-core:2.24.0'
4449
testCompile 'org.testng:testng:6.8.8'
4550
}

src/main/java/com/linkedin/xinfra/monitor/common/Utils.java

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@
1313
import com.fasterxml.jackson.core.JsonProcessingException;
1414
import com.fasterxml.jackson.databind.ObjectMapper;
1515
import com.fasterxml.jackson.databind.ObjectWriter;
16-
import java.io.ByteArrayOutputStream;
16+
import com.linkedin.avroutil1.compatibility.AvroCodecUtil;
17+
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
18+
import com.linkedin.avroutil1.compatibility.AvroVersion;
1719
import java.io.IOException;
1820
import java.lang.management.ManagementFactory;
1921
import java.time.Duration;
@@ -34,18 +36,16 @@
3436
import javax.management.ObjectName;
3537
import kafka.admin.BrokerMetadata;
3638
import org.apache.avro.generic.GenericData;
37-
import org.apache.avro.generic.GenericDatumWriter;
39+
import org.apache.avro.generic.GenericDatumReader;
3840
import org.apache.avro.generic.GenericRecord;
39-
import org.apache.avro.io.Encoder;
40-
import org.apache.avro.io.JsonEncoder;
41+
import org.apache.avro.io.Decoder;
4142
import org.apache.kafka.clients.admin.AdminClient;
4243
import org.apache.kafka.clients.admin.CreateTopicsResult;
4344
import org.apache.kafka.clients.admin.ListPartitionReassignmentsResult;
4445
import org.apache.kafka.clients.admin.NewTopic;
4546
import org.apache.kafka.clients.admin.PartitionReassignment;
4647
import org.apache.kafka.common.TopicPartition;
4748
import org.apache.kafka.common.errors.TopicExistsException;
48-
import org.json.JSONObject;
4949
import org.slf4j.Logger;
5050
import org.slf4j.LoggerFactory;
5151

@@ -224,29 +224,21 @@ public static String jsonFromFields(String topic, long idx, long timestamp, Stri
224224
* @return GenericRecord that is de-serialized from kafka message w.r.t. expected schema.
225225
*/
226226
public static GenericRecord genericRecordFromJson(String message) {
227-
GenericRecord record = new GenericData.Record(DefaultTopicSchema.MESSAGE_V0);
228-
JSONObject jsonObject = new JSONObject(message);
229-
record.put(DefaultTopicSchema.TOPIC_FIELD.name(), jsonObject.getString(DefaultTopicSchema.TOPIC_FIELD.name()));
230-
record.put(DefaultTopicSchema.INDEX_FIELD.name(), jsonObject.getLong(DefaultTopicSchema.INDEX_FIELD.name()));
231-
record.put(DefaultTopicSchema.TIME_FIELD.name(), jsonObject.getLong(DefaultTopicSchema.TIME_FIELD.name()));
232-
record.put(DefaultTopicSchema.PRODUCER_ID_FIELD.name(),
233-
jsonObject.getString(DefaultTopicSchema.PRODUCER_ID_FIELD.name()));
234-
record.put(DefaultTopicSchema.CONTENT_FIELD.name(), jsonObject.getString(DefaultTopicSchema.CONTENT_FIELD.name()));
235-
return record;
227+
try {
228+
Decoder jsonDecoder = AvroCompatibilityHelper.newCompatibleJsonDecoder(DefaultTopicSchema.MESSAGE_V0, message);
229+
GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(DefaultTopicSchema.MESSAGE_V0, DefaultTopicSchema.MESSAGE_V0);
230+
return reader.read(null, jsonDecoder);
231+
} catch (Exception e) {
232+
throw new IllegalStateException("unable to deserialize " + message, e);
233+
}
236234
}
237235

238236
public static String jsonFromGenericRecord(GenericRecord record) {
239-
ByteArrayOutputStream out = new ByteArrayOutputStream();
240-
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(DefaultTopicSchema.MESSAGE_V0);
241-
242237
try {
243-
Encoder encoder = new JsonEncoder(DefaultTopicSchema.MESSAGE_V0, out);
244-
writer.write(record, encoder);
245-
encoder.flush();
238+
return AvroCodecUtil.serializeJson(record, AvroVersion.AVRO_1_4);
246239
} catch (IOException e) {
247-
LOG.error("Unable to serialize avro record due to error " + e);
240+
throw new IllegalStateException("Unable to serialize avro record due to error: " + record, e);
248241
}
249-
return out.toString();
250242
}
251243

252244
public static List<MbeanAttributeValue> getMBeanAttributeValues(String mbeanExpr, String attributeExpr) {

src/main/java/com/linkedin/xinfra/monitor/services/GraphiteMetricsReporterService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import net.savantly.graphite.GraphiteClient;
2424
import net.savantly.graphite.GraphiteClientFactory;
2525
import net.savantly.graphite.impl.SimpleCarbonMetric;
26-
import org.apache.commons.lang.StringUtils;
26+
import org.apache.commons.lang3.StringUtils;
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

src/main/java/com/linkedin/xinfra/monitor/services/SignalFxMetricsReporterService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.util.concurrent.Executors;
2020
import java.util.concurrent.ScheduledExecutorService;
2121
import java.util.concurrent.TimeUnit;
22-
import org.apache.commons.lang.StringUtils;
22+
import org.apache.commons.lang3.StringUtils;
2323
import org.slf4j.Logger;
2424
import org.slf4j.LoggerFactory;
2525

src/main/java/com/linkedin/xinfra/monitor/services/StatsdMetricsReporterService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import java.util.concurrent.Executors;
2121
import java.util.concurrent.ScheduledExecutorService;
2222
import java.util.concurrent.TimeUnit;
23-
import org.apache.commons.lang.StringUtils;
23+
import org.apache.commons.lang3.StringUtils;
2424
import org.slf4j.Logger;
2525
import org.slf4j.LoggerFactory;
2626

0 commit comments

Comments
 (0)