4242
4343import java .util .ArrayList ;
4444import java .util .Arrays ;
45+ import java .util .Collections ;
46+ import java .util .HashSet ;
47+ import java .util .Iterator ;
4548import java .util .List ;
4649import java .util .Map ;
4750import java .util .Optional ;
5558/** Parser for converting YAML formatted pipeline definition to {@link PipelineDef}. */
5659public class YamlPipelineDefinitionParser implements PipelineDefinitionParser {
5760
61+ private static final String TOP_LEVEL_NAME = "top-level" ;
62+
5863 // Parent node keys
5964 private static final String SOURCE_KEY = "source" ;
6065 private static final String SINK_KEY = "sink" ;
@@ -118,6 +123,11 @@ public PipelineDef parse(String pipelineDefText, Configuration globalPipelineCon
118123
119124 private PipelineDef parse (JsonNode pipelineDefJsonNode , Configuration globalPipelineConfig )
120125 throws Exception {
126+ validateJsonNodeKeys (
127+ TOP_LEVEL_NAME ,
128+ pipelineDefJsonNode ,
129+ Arrays .asList (SOURCE_KEY , SINK_KEY ),
130+ Arrays .asList (ROUTE_KEY , TRANSFORM_KEY , PIPELINE_KEY ));
121131
122132 // UDFs are optional. We parse UDF first and remove it from the pipelineDefJsonNode since
123133 // it's not of plain data types and must be removed before calling toPipelineConfig.
@@ -126,10 +136,12 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
126136 if (pipelineDefJsonNode .get (PIPELINE_KEY ) != null ) {
127137 Optional .ofNullable (
128138 ((ObjectNode ) pipelineDefJsonNode .get (PIPELINE_KEY )).remove (UDF_KEY ))
139+ .map (node -> validateArray ("UDF" , node ))
129140 .ifPresent (node -> node .forEach (udf -> udfDefs .add (toUdfDef (udf ))));
130141
131142 Optional .ofNullable (
132143 ((ObjectNode ) pipelineDefJsonNode .get (PIPELINE_KEY )).remove (MODEL_KEY ))
144+ .map (node -> validateArray ("model" , node ))
133145 .ifPresent (node -> modelDefs .addAll (parseModels (node )));
134146 }
135147
@@ -159,6 +171,7 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
159171 // Transforms are optional
160172 List <TransformDef > transformDefs = new ArrayList <>();
161173 Optional .ofNullable (pipelineDefJsonNode .get (TRANSFORM_KEY ))
174+ .map (node -> validateArray ("transform" , node ))
162175 .ifPresent (
163176 node ->
164177 node .forEach (
@@ -167,6 +180,7 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
167180 // Routes are optional
168181 List <RouteDef > routeDefs = new ArrayList <>();
169182 Optional .ofNullable (pipelineDefJsonNode .get (ROUTE_KEY ))
183+ .map (node -> validateArray ("route" , node ))
170184 .ifPresent (node -> node .forEach (route -> routeDefs .add (toRouteDef (route ))));
171185
172186 // Merge user config into global config
@@ -247,6 +261,12 @@ private SinkDef toSinkDef(JsonNode sinkNode, SchemaChangeBehavior schemaChangeBe
247261 }
248262
249263 private RouteDef toRouteDef (JsonNode routeNode ) {
264+ validateJsonNodeKeys (
265+ "route" ,
266+ routeNode ,
267+ Arrays .asList (ROUTE_SOURCE_TABLE_KEY , ROUTE_SINK_TABLE_KEY ),
268+ Arrays .asList (ROUTE_REPLACE_SYMBOL , ROUTE_DESCRIPTION_KEY ));
269+
250270 String sourceTable =
251271 checkNotNull (
252272 routeNode .get (ROUTE_SOURCE_TABLE_KEY ),
@@ -271,6 +291,12 @@ private RouteDef toRouteDef(JsonNode routeNode) {
271291 }
272292
273293 private UdfDef toUdfDef (JsonNode udfNode ) {
294+ validateJsonNodeKeys (
295+ "UDF" ,
296+ udfNode ,
297+ Arrays .asList (UDF_FUNCTION_NAME_KEY , UDF_CLASSPATH_KEY ),
298+ Collections .emptyList ());
299+
274300 String functionName =
275301 checkNotNull (
276302 udfNode .get (UDF_FUNCTION_NAME_KEY ),
@@ -288,6 +314,19 @@ private UdfDef toUdfDef(JsonNode udfNode) {
288314 }
289315
290316 private TransformDef toTransformDef (JsonNode transformNode ) {
317+ validateJsonNodeKeys (
318+ "transform" ,
319+ transformNode ,
320+ Collections .singletonList (TRANSFORM_SOURCE_TABLE_KEY ),
321+ Arrays .asList (
322+ TRANSFORM_PROJECTION_KEY ,
323+ TRANSFORM_FILTER_KEY ,
324+ TRANSFORM_PRIMARY_KEY_KEY ,
325+ TRANSFORM_PARTITION_KEY_KEY ,
326+ TRANSFORM_TABLE_OPTION_KEY ,
327+ TRANSFORM_DESCRIPTION_KEY ,
328+ TRANSFORM_CONVERTER_AFTER_TRANSFORM_KEY ));
329+
291330 String sourceTable =
292331 checkNotNull (
293332 transformNode .get (TRANSFORM_SOURCE_TABLE_KEY ),
@@ -377,4 +416,57 @@ private ModelDef convertJsonNodeToModelDef(JsonNode modelNode) {
377416 Map <String , String > properties = mapper .convertValue (modelNode , Map .class );
378417 return new ModelDef (name , model , properties );
379418 }
419+
420+ private void validateJsonNodeKeys (
421+ String contextName ,
422+ JsonNode jsonNode ,
423+ List <String > requiredKeys ,
424+ List <String > optionalKeys )
425+ throws IllegalArgumentException {
426+ List <String > validKeys = new ArrayList <>(requiredKeys );
427+ Set <String > presentedKeys = new HashSet <>();
428+ validKeys .addAll (optionalKeys );
429+
430+ for (Iterator <String > it = jsonNode .fieldNames (); it .hasNext (); ) {
431+ String key = it .next ();
432+ presentedKeys .add (key );
433+ if (!validKeys .contains (key )) {
434+ if (TOP_LEVEL_NAME .equals (contextName )) {
435+ throw new IllegalArgumentException (
436+ String .format (
437+ "Unexpected key `%s` in YAML top-level block.\n "
438+ + "Allowed keys in this context are: %s\n "
439+ + "Note: Flink configurations should be defined in \" Runtime Configurations\" instead of YAML scripts." ,
440+ key , validKeys ));
441+ } else {
442+ throw new IllegalArgumentException (
443+ String .format (
444+ "Unexpected key `%s` in YAML %s block.\n "
445+ + "Allowed keys in this context are: %s\n "
446+ + "Note: option %s: %s is unexpected. It was silently ignored in previous versions, and probably should be removed." ,
447+ key , contextName , validKeys , key , jsonNode .get (key )));
448+ }
449+ }
450+ }
451+
452+ for (String key : requiredKeys ) {
453+ if (!presentedKeys .contains (key )) {
454+ throw new IllegalArgumentException (
455+ String .format (
456+ "Missing required field \" %s\" in %s configuration" ,
457+ key , contextName ));
458+ }
459+ }
460+ }
461+
462+ private JsonNode validateArray (String contextName , JsonNode jsonNode ) {
463+ if (jsonNode .isArray ()) {
464+ return jsonNode ;
465+ } else {
466+ throw new IllegalArgumentException (
467+ String .format (
468+ "YAML %s block is expecting an array children, but got an %s (%s). Perhaps you missed a dash prefix `-`?" ,
469+ contextName , jsonNode .getNodeType (), jsonNode ));
470+ }
471+ }
380472}
0 commit comments