&#010;Assert on query failed: name: The code passed to eventually never returned normally. Attempted 1306 times over 1.0633471959666667 minutes. Last failure message: clock.isStreamWaitingAt(clock.getTimeMillis()) was false.&#010;org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421)&#010; org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)&#010; org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)&#010; org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:337)&#010; org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)&#010; org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$46.apply(KafkaMicroBatchSourceSuite.scala:782)&#010; org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$46.apply(KafkaMicroBatchSourceSuite.scala:781)&#010; org.apache.spark.sql.streaming.StreamTest$Execute$$anonfun$apply$10.apply(StreamTest.scala:297)&#010; org.apache.spark.sql.streaming.StreamTest$Execute$$anonfun$apply$10.apply(StreamTest.scala:297)&#010; org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$11.apply$mcZ$sp(StreamTest.scala:651)&#010;&#010; Caused by: clock.isStreamWaitingAt(clock.getTimeMillis()) was false&#010; org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)&#010; org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)&#010; org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)&#010; org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$46$$anonfun$apply$30.apply(KafkaMicroBatchSourceSuite.scala:784)&#010; org.scalatest.concurrent.Eventually$class.makeAValiantAttempt$1(Eventually.scala:395)&#010; org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:409)&#010; org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)&#010; org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)&#010; org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:337)&#010; org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)&#010;&#010;&#010;== Progress ==&#010; StartStream(ProcessingTime(100),org.apache.spark.sql.streaming.util.StreamManualClock@66b8f244,Map(),null)&#010; AssertOnQuery(<condition>, name)&#010; CheckAnswer: &#010; AssertOnQuery(<condition>, name)&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: &#010; AssertOnQuery(<condition>, name)&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: [0],[1],[2]&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: [3],[4]&#010; AssertOnQuery(<condition>, name)&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: &#010; AdvanceManualClock(100)&#010;=> AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: &#010; AssertOnQuery(<condition>, name)&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: [12],[13],[14]&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: [15],[16]&#010; AssertOnQuery(<condition>, name)&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: [18],[20]&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: [22],[23]&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: &#010;&#010;== Stream ==&#010;Output Mode: Append&#010;Stream state: {KafkaV2[Subscribe[topic-34]]: {"topic-34":{"0":9}}}&#010;Thread state: alive&#010;Thread stack trace: com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:280)&#010;com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:35)&#010;com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:10)&#010;com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:530)&#010;com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)&#010;com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)&#010;com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)&#010;com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)&#010;com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)&#010;com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013)&#010;com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3004)&#010;org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:86)&#010;org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)&#010;org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)&#010;scala.Option.map(Option.scala:146)&#010;org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:137)&#010;org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1747)&#010;org.apache.spark.SparkContext.runJob(SparkContext.scala:2015)&#010;org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)&#010;org.apache.spark.SparkContext.runJob(SparkContext.scala:2053)&#010;org.apache.spark.SparkContext.runJob(SparkContext.scala:2078)&#010;org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:961)&#010;org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)&#010;org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)&#010;org.apache.spark.rdd.RDD.withScope(RDD.scala:366)&#010;org.apache.spark.rdd.RDD.collect(RDD.scala:960)&#010;org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)&#010;org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3371)&#010;org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2720)&#010;org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2720)&#010;org.apache.spark.sql.Dataset$$anonfun$withAction$1.apply(Dataset.scala:3360)&#010;org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:100)&#010;org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)&#010;org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)&#010;org.apache.spark.sql.Dataset.withAction(Dataset.scala:3356)&#010;org.apache.spark.sql.Dataset.collect(Dataset.scala:2720)&#010;org.apache.spark.sql.execution.streaming.MemorySink.addBatch(memory.scala:324)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12$$anonfun$apply$18.apply(MicroBatchExecution.scala:552)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12$$anonfun$apply$18.apply(MicroBatchExecution.scala:550)&#010;org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:100)&#010;org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)&#010;org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12.apply(MicroBatchExecution.scala:550)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12.apply(MicroBatchExecution.scala:550)&#010;org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:327)&#010;org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:549)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:210)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:178)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:178)&#010;org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:327)&#010;org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:178)&#010;org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:172)&#010;org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:331)&#010;org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:243)&#010;&#010;&#010;== Sink ==&#010;0: &#010;1: [0] [1] [2]&#010;2: [3] [4]&#010;3: &#010;&#010;&#010;== Plan ==&#010;== Parsed Logical Plan ==&#010;SerializeFromObject [input[0, int, false] AS value#2541]&#010;+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#2540: int&#010; +- DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2&#010; +- Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]&#010; +- StreamingDataSourceV2Relation [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ae6193f, KafkaV2[Subscribe[topic-34]], {"topic-34":{"0":9}}, {"topic-34":{"0":12}}&#010;&#010;== Analyzed Logical Plan ==&#010;value: int&#010;SerializeFromObject [input[0, int, false] AS value#2541]&#010;+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#2540: int&#010; +- DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2&#010; +- Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]&#010; +- StreamingDataSourceV2Relation [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ae6193f, KafkaV2[Subscribe[topic-34]], {"topic-34":{"0":9}}, {"topic-34":{"0":12}}&#010;&#010;== Optimized Logical Plan ==&#010;SerializeFromObject [input[0, int, false] AS value#2541]&#010;+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#2540: int&#010; +- DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2&#010; +- Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]&#010; +- StreamingDataSourceV2Relation [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ae6193f, KafkaV2[Subscribe[topic-34]], {"topic-34":{"0":9}}, {"topic-34":{"0":12}}&#010;&#010;== Physical Plan ==&#010;*(1) SerializeFromObject [input[0, int, false] AS value#2541]&#010;+- *(1) MapElements <function1>, obj#2540: int&#010; +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2&#010; +- *(1) Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]&#010; +- *(1) Project [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521]&#010; +- *(1) MicroBatchScan[key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan&#010;&#010; &#010;


      org.scalatest.exceptions.TestFailedException: 
Assert on query failed: name: The code passed to eventually never returned normally. Attempted 1306 times over 1.0633471959666667 minutes. Last failure message: clock.isStreamWaitingAt(clock.getTimeMillis()) was false.
org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421)
	org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
	org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)
	org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:337)
	org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)
	org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$46.apply(KafkaMicroBatchSourceSuite.scala:782)
	org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$46.apply(KafkaMicroBatchSourceSuite.scala:781)
	org.apache.spark.sql.streaming.StreamTest$Execute$$anonfun$apply$10.apply(StreamTest.scala:297)
	org.apache.spark.sql.streaming.StreamTest$Execute$$anonfun$apply$10.apply(StreamTest.scala:297)
	org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$11.apply$mcZ$sp(StreamTest.scala:651)

	Caused by: 	clock.isStreamWaitingAt(clock.getTimeMillis()) was false
	org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
		org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
		org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
		org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$46$$anonfun$apply$30.apply(KafkaMicroBatchSourceSuite.scala:784)
		org.scalatest.concurrent.Eventually$class.makeAValiantAttempt$1(Eventually.scala:395)
		org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:409)
		org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
		org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)
		org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:337)
		org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)


== Progress ==
   StartStream(ProcessingTime(100),org.apache.spark.sql.streaming.util.StreamManualClock@66b8f244,Map(),null)
   AssertOnQuery(<condition>, name)
   CheckAnswer: 
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: 
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [0],[1],[2]
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [3],[4]
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: 
   AdvanceManualClock(100)
=> AssertOnQuery(<condition>, name)
   CheckNewAnswer: 
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [12],[13],[14]
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [15],[16]
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [18],[20]
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [22],[23]
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: 

== Stream ==
Output Mode: Append
Stream state: {KafkaV2[Subscribe[topic-34]]: {"topic-34":{"0":9}}}
Thread state: alive
Thread stack trace: com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:280)
com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:35)
com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:10)
com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:530)
com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)
com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)
com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013)
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3004)
org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:86)
org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)
org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)
scala.Option.map(Option.scala:146)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:137)
org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1747)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2015)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2053)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2078)
org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:961)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:366)
org.apache.spark.rdd.RDD.collect(RDD.scala:960)
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3371)
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2720)
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2720)
org.apache.spark.sql.Dataset$$anonfun$withAction$1.apply(Dataset.scala:3360)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:100)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:3356)
org.apache.spark.sql.Dataset.collect(Dataset.scala:2720)
org.apache.spark.sql.execution.streaming.MemorySink.addBatch(memory.scala:324)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12$$anonfun$apply$18.apply(MicroBatchExecution.scala:552)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12$$anonfun$apply$18.apply(MicroBatchExecution.scala:550)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:100)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12.apply(MicroBatchExecution.scala:550)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12.apply(MicroBatchExecution.scala:550)
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:327)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:549)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:210)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:178)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:178)
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:327)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:178)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:172)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:331)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:243)


== Sink ==
0: 
1: [0] [1] [2]
2: [3] [4]
3: 


== Plan ==
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2541]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#2540: int
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2
      +- Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]
         +- StreamingDataSourceV2Relation [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ae6193f, KafkaV2[Subscribe[topic-34]], {"topic-34":{"0":9}}, {"topic-34":{"0":12}}

== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#2541]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#2540: int
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2
      +- Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]
         +- StreamingDataSourceV2Relation [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ae6193f, KafkaV2[Subscribe[topic-34]], {"topic-34":{"0":9}}, {"topic-34":{"0":12}}

== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2541]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#2540: int
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2
      +- Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]
         +- StreamingDataSourceV2Relation [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ae6193f, KafkaV2[Subscribe[topic-34]], {"topic-34":{"0":9}}, {"topic-34":{"0":12}}

== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2541]
+- *(1) MapElements <function1>, obj#2540: int
   +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2
      +- *(1) Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]
         +- *(1) Project [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521]
            +- *(1) MicroBatchScan[key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan

         
         
      at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
      at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
      at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
      at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
      at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:452)
      at org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:654)
      at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:778)
      at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:765)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
      at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:765)
      at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:764)
      at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:49)
      at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$apply$mcV$sp$25.apply(KafkaMicroBatchSourceSuite.scala:795)
      at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$apply$mcV$sp$25.apply(KafkaMicroBatchSourceSuite.scala:794)
      at org.apache.spark.sql.kafka010.KafkaTestUtils.withTranscationalProducer(KafkaTestUtils.scala:361)
      at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14.apply$mcV$sp(KafkaMicroBatchSourceSuite.scala:794)
      at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14.apply(KafkaMicroBatchSourceSuite.scala:750)
      at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14.apply(KafkaMicroBatchSourceSuite.scala:750)
      at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
      at org.scalatest.Transformer.apply(Transformer.scala:22)
      at org.scalatest.Transformer.apply(Transformer.scala:20)
      at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
      at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104)
      at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
      at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
      at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
      at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
      at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
      at org.apache.spark.sql.kafka010.KafkaSourceTest.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaMicroBatchSourceSuite.scala:49)
      at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
      at org.apache.spark.sql.kafka010.KafkaSourceTest.runTest(KafkaMicroBatchSourceSuite.scala:49)
      at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
      at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
      at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
      at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
      at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
      at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
      at org.scalatest.Suite$class.run(Suite.scala:1147)
      at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
      at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
      at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
      at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
      at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
      at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:53)
      at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
      at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
      at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:53)
      at org.scalatest.Suite$class.callExecuteOnSuite$1(Suite.scala:1210)
      at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1257)
      at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1255)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
      at org.scalatest.Suite$class.runNestedSuites(Suite.scala:1255)
      at org.scalatest.tools.DiscoverySuite.runNestedSuites(DiscoverySuite.scala:30)
      at org.scalatest.Suite$class.run(Suite.scala:1144)
      at org.scalatest.tools.DiscoverySuite.run(DiscoverySuite.scala:30)
      at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
      at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1346)
      at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
      at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
      at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
      at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
      at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
      at org.scalatest.tools.Runner$.main(Runner.scala:827)
      at org.scalatest.tools.Runner.main(Runner.scala)