&#010;Assert on query failed: name: The code passed to eventually never returned normally. Attempted 317 times over 2.8661059611499997 minutes. Last failure message: clock.isStreamWaitingAt(clock.getTimeMillis()) was false.&#010;org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421)&#010; org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)&#010; org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:47)&#010; org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:337)&#010; org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:47)&#010; org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11$$anonfun$37.apply(KafkaMicroBatchSourceSuite.scala:647)&#010; org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11$$anonfun$37.apply(KafkaMicroBatchSourceSuite.scala:646)&#010; org.apache.spark.sql.streaming.StreamTest$Execute$$anonfun$apply$10.apply(StreamTest.scala:298)&#010; org.apache.spark.sql.streaming.StreamTest$Execute$$anonfun$apply$10.apply(StreamTest.scala:298)&#010; org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$11.apply$mcZ$sp(StreamTest.scala:652)&#010;&#010; Caused by: clock.isStreamWaitingAt(clock.getTimeMillis()) was false&#010; org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)&#010; org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)&#010; org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)&#010; org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11$$anonfun$37$$anonfun$apply$31.apply(KafkaMicroBatchSourceSuite.scala:649)&#010; org.scalatest.concurrent.Eventually$class.makeAValiantAttempt$1(Eventually.scala:395)&#010; org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:409)&#010; org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)&#010; org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:47)&#010; org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:337)&#010; org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:47)&#010;&#010;&#010;== Progress ==&#010; StartStream(ProcessingTime(100),org.apache.spark.sql.streaming.util.StreamManualClock@50005205,Map(),null)&#010; AssertOnQuery(<condition>, name)&#010; CheckAnswer: &#010; AssertOnQuery(<condition>, name)&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: &#010; AssertOnQuery(<condition>, name)&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: [0],[1],[2]&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: [3],[4]&#010; AssertOnQuery(<condition>, name)&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: &#010; AdvanceManualClock(100)&#010;=> AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: &#010; AssertOnQuery(<condition>, name)&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: [12],[13],[14]&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: [15],[16]&#010; AssertOnQuery(<condition>, name)&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: [18],[20]&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: [22],[23]&#010; AdvanceManualClock(100)&#010; AssertOnQuery(<condition>, name)&#010; CheckNewAnswer: &#010;&#010;== Stream ==&#010;Output Mode: Append&#010;Stream state: {KafkaSourceV1[Subscribe[topic-31]]: {"topic-31":{"0":9}}}&#010;Thread state: alive&#010;Thread stack trace: sun.misc.Unsafe.park(Native Method)&#010;java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)&#010;java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)&#010;java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)&#010;java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)&#010;scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206)&#010;scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)&#010;scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)&#010;org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:243)&#010;org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:731)&#010;org.apache.spark.SparkContext.runJob(SparkContext.scala:2073)&#010;org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)&#010;org.apache.spark.SparkContext.runJob(SparkContext.scala:2113)&#010;org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)&#010;org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)&#010;org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)&#010;org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)&#010;org.apache.spark.rdd.RDD.withScope(RDD.scala:363)&#010;org.apache.spark.rdd.RDD.collect(RDD.scala:944)&#010;org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)&#010;org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)&#010;org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783)&#010;org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783)&#010;org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)&#010;org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)&#010;org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)&#010;org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)&#010;org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)&#010;org.apache.spark.sql.Dataset.collect(Dataset.scala:2783)&#010;org.apache.spark.sql.execution.streaming.MemorySink.addBatch(memory.scala:277)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$18.apply(MicroBatchExecution.scala:535)&#010;org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)&#010;org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)&#010;org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:533)&#010;org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:323)&#010;org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:532)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:195)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:163)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:163)&#010;org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:323)&#010;org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163)&#010;org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)&#010;org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:157)&#010;org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)&#010;org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)&#010;&#010;&#010;== Sink ==&#010;0: &#010;1: [0] [1] [2]&#010;2: [3] [4]&#010;3: &#010;&#010;&#010;== Plan ==&#010;== Parsed Logical Plan ==&#010;SerializeFromObject [input[0, int, false] AS value#8974]&#010;+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#8973: int&#010; +- DeserializeToObject newInstance(class scala.Tuple2), obj#8972: scala.Tuple2&#010; +- Project [cast(key#8950 as string) AS key#8964, cast(value#8951 as string) AS value#8965]&#010; +- Project [key#9065 AS key#8950, value#9066 AS value#8951, topic#9067 AS topic#8952, partition#9068 AS partition#8953, offset#9069L AS offset#8954L, timestamp#9070 AS timestamp#8955, timestampType#9071 AS timestampType#8956]&#010; +- LogicalRDD [key#9065, value#9066, topic#9067, partition#9068, offset#9069L, timestamp#9070, timestampType#9071], true&#010;&#010;== Analyzed Logical Plan ==&#010;value: int&#010;SerializeFromObject [input[0, int, false] AS value#8974]&#010;+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#8973: int&#010; +- DeserializeToObject newInstance(class scala.Tuple2), obj#8972: scala.Tuple2&#010; +- Project [cast(key#8950 as string) AS key#8964, cast(value#8951 as string) AS value#8965]&#010; +- Project [key#9065 AS key#8950, value#9066 AS value#8951, topic#9067 AS topic#8952, partition#9068 AS partition#8953, offset#9069L AS offset#8954L, timestamp#9070 AS timestamp#8955, timestampType#9071 AS timestampType#8956]&#010; +- LogicalRDD [key#9065, value#9066, topic#9067, partition#9068, offset#9069L, timestamp#9070, timestampType#9071], true&#010;&#010;== Optimized Logical Plan ==&#010;SerializeFromObject [input[0, int, false] AS value#8974]&#010;+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#8973: int&#010; +- DeserializeToObject newInstance(class scala.Tuple2), obj#8972: scala.Tuple2&#010; +- Project [cast(key#9065 as string) AS key#8964, cast(value#9066 as string) AS value#8965]&#010; +- LogicalRDD [key#9065, value#9066, topic#9067, partition#9068, offset#9069L, timestamp#9070, timestampType#9071], true&#010;&#010;== Physical Plan ==&#010;*(1) SerializeFromObject [input[0, int, false] AS value#8974]&#010;+- *(1) MapElements <function1>, obj#8973: int&#010; +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#8972: scala.Tuple2&#010; +- *(1) Project [cast(key#9065 as string) AS key#8964, cast(value#9066 as string) AS value#8965]&#010; +- Scan ExistingRDD kafka[key#9065,value#9066,topic#9067,partition#9068,offset#9069L,timestamp#9070,timestampType#9071]&#010; &#010;


      org.scalatest.exceptions.TestFailedException: 
Assert on query failed: name: The code passed to eventually never returned normally. Attempted 317 times over 2.8661059611499997 minutes. Last failure message: clock.isStreamWaitingAt(clock.getTimeMillis()) was false.
org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421)
	org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
	org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:47)
	org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:337)
	org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:47)
	org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11$$anonfun$37.apply(KafkaMicroBatchSourceSuite.scala:647)
	org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11$$anonfun$37.apply(KafkaMicroBatchSourceSuite.scala:646)
	org.apache.spark.sql.streaming.StreamTest$Execute$$anonfun$apply$10.apply(StreamTest.scala:298)
	org.apache.spark.sql.streaming.StreamTest$Execute$$anonfun$apply$10.apply(StreamTest.scala:298)
	org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$11.apply$mcZ$sp(StreamTest.scala:652)

	Caused by: 	clock.isStreamWaitingAt(clock.getTimeMillis()) was false
	org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
		org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
		org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
		org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11$$anonfun$37$$anonfun$apply$31.apply(KafkaMicroBatchSourceSuite.scala:649)
		org.scalatest.concurrent.Eventually$class.makeAValiantAttempt$1(Eventually.scala:395)
		org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:409)
		org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
		org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:47)
		org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:337)
		org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:47)


== Progress ==
   StartStream(ProcessingTime(100),org.apache.spark.sql.streaming.util.StreamManualClock@50005205,Map(),null)
   AssertOnQuery(<condition>, name)
   CheckAnswer: 
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: 
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [0],[1],[2]
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [3],[4]
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: 
   AdvanceManualClock(100)
=> AssertOnQuery(<condition>, name)
   CheckNewAnswer: 
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [12],[13],[14]
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [15],[16]
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [18],[20]
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [22],[23]
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: 

== Stream ==
Output Mode: Append
Stream state: {KafkaSourceV1[Subscribe[topic-31]]: {"topic-31":{"0":9}}}
Thread state: alive
Thread stack trace: sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206)
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:243)
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:731)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2073)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2094)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2113)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2138)
org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
org.apache.spark.rdd.RDD.collect(RDD.scala:944)
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3384)
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783)
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783)
org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
org.apache.spark.sql.Dataset.collect(Dataset.scala:2783)
org.apache.spark.sql.execution.streaming.MemorySink.addBatch(memory.scala:277)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$18.apply(MicroBatchExecution.scala:535)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:533)
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:323)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:532)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:195)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:163)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:163)
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:323)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:157)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)


== Sink ==
0: 
1: [0] [1] [2]
2: [3] [4]
3: 


== Plan ==
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#8974]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#8973: int
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#8972: scala.Tuple2
      +- Project [cast(key#8950 as string) AS key#8964, cast(value#8951 as string) AS value#8965]
         +- Project [key#9065 AS key#8950, value#9066 AS value#8951, topic#9067 AS topic#8952, partition#9068 AS partition#8953, offset#9069L AS offset#8954L, timestamp#9070 AS timestamp#8955, timestampType#9071 AS timestampType#8956]
            +- LogicalRDD [key#9065, value#9066, topic#9067, partition#9068, offset#9069L, timestamp#9070, timestampType#9071], true

== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#8974]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#8973: int
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#8972: scala.Tuple2
      +- Project [cast(key#8950 as string) AS key#8964, cast(value#8951 as string) AS value#8965]
         +- Project [key#9065 AS key#8950, value#9066 AS value#8951, topic#9067 AS topic#8952, partition#9068 AS partition#8953, offset#9069L AS offset#8954L, timestamp#9070 AS timestamp#8955, timestampType#9071 AS timestampType#8956]
            +- LogicalRDD [key#9065, value#9066, topic#9067, partition#9068, offset#9069L, timestamp#9070, timestampType#9071], true

== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#8974]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#8973: int
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#8972: scala.Tuple2
      +- Project [cast(key#9065 as string) AS key#8964, cast(value#9066 as string) AS value#8965]
         +- LogicalRDD [key#9065, value#9066, topic#9067, partition#9068, offset#9069L, timestamp#9070, timestampType#9071], true

== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#8974]
+- *(1) MapElements <function1>, obj#8973: int
   +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#8972: scala.Tuple2
      +- *(1) Project [cast(key#9065 as string) AS key#8964, cast(value#9066 as string) AS value#8965]
         +- Scan ExistingRDD kafka[key#9065,value#9066,topic#9067,partition#9068,offset#9069L,timestamp#9070,timestampType#9071]
         
         
      at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
      at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
      at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
      at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
      at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:453)
      at org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:655)
      at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:773)
      at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:760)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
      at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:760)
      at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:759)
      at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:47)
      at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11$$anonfun$apply$mcV$sp$14.apply(KafkaMicroBatchSourceSuite.scala:660)
      at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11$$anonfun$apply$mcV$sp$14.apply(KafkaMicroBatchSourceSuite.scala:659)
      at org.apache.spark.sql.kafka010.KafkaTestUtils.withTranscationalProducer(KafkaTestUtils.scala:357)
      at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11.apply$mcV$sp(KafkaMicroBatchSourceSuite.scala:659)
      at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11.apply(KafkaMicroBatchSourceSuite.scala:615)
      at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11.apply(KafkaMicroBatchSourceSuite.scala:615)
      at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
      at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
      at org.scalatest.Transformer.apply(Transformer.scala:22)
      at org.scalatest.Transformer.apply(Transformer.scala:20)
      at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
      at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
      at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
      at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
      at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
      at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
      at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
      at org.apache.spark.sql.kafka010.KafkaSourceTest.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaMicroBatchSourceSuite.scala:47)
      at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
      at org.apache.spark.sql.kafka010.KafkaSourceTest.runTest(KafkaMicroBatchSourceSuite.scala:47)
      at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
      at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
      at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
      at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
      at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
      at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
      at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
      at org.scalatest.Suite$class.run(Suite.scala:1147)
      at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
      at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
      at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
      at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
      at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
      at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
      at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
      at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
      at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
      at org.scalatest.Suite$class.callExecuteOnSuite$1(Suite.scala:1210)
      at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1257)
      at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1255)
      at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
      at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
      at org.scalatest.Suite$class.runNestedSuites(Suite.scala:1255)
      at org.scalatest.tools.DiscoverySuite.runNestedSuites(DiscoverySuite.scala:30)
      at org.scalatest.Suite$class.run(Suite.scala:1144)
      at org.scalatest.tools.DiscoverySuite.run(DiscoverySuite.scala:30)
      at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
      at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
      at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)
      at scala.collection.immutable.List.foreach(List.scala:392)
      at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)
      at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
      at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
      at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)
      at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
      at org.scalatest.tools.Runner$.main(Runner.scala:827)
      at org.scalatest.tools.Runner.main(Runner.scala)