org.scalatest.exceptions.TestFailedException: == Results == !== Correct Answer - 3 == == Spark Answer - 0 == !struct<value:int> struct<> ![0] ![1] ![2] == Progress == StartStream(ProcessingTime(100),org.apache.spark.sql.streaming.util.StreamManualClock@24dbef55,Map(),null) AssertOnQuery(<condition>, name) CheckAnswer: AssertOnQuery(<condition>, name) AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: AssertOnQuery(<condition>, name) AdvanceManualClock(100) AssertOnQuery(<condition>, name) => CheckNewAnswer: [0],[1],[2] AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: [3],[4] AssertOnQuery(<condition>, name) AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: AssertOnQuery(<condition>, name) AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: [12],[13],[14] AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: [15],[16] AssertOnQuery(<condition>, name) AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: [18],[20] AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: [22],[23] AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: == Stream == Output Mode: Append Stream state: {KafkaSourceV1[Subscribe[topic-31]]: {"topic-31":{"0":0}}} Thread state: alive Thread stack trace: java.lang.Object.wait(Native Method) org.apache.spark.util.ManualClock.waitTillTime(ManualClock.scala:61) org.apache.spark.sql.streaming.util.StreamManualClock.waitTillTime(StreamManualClock.scala:34) org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:65) org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:157) org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) == Sink == 0: == Plan == == Parsed Logical Plan == SerializeFromObject [input[0, int, false] AS value#4186] +- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#4185: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2 +- Project [cast(key#4162 as string) AS key#4176, cast(value#4163 as string) AS value#4177] +- Project [key#4213 AS key#4162, value#4214 AS value#4163, topic#4215 AS topic#4164, partition#4216 AS partition#4165, offset#4217L AS offset#4166L, timestamp#4218 AS timestamp#4167, timestampType#4219 AS timestampType#4168] +- LogicalRDD [key#4213, value#4214, topic#4215, partition#4216, offset#4217L, timestamp#4218, timestampType#4219], true == Analyzed Logical Plan == value: int SerializeFromObject [input[0, int, false] AS value#4186] +- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#4185: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2 +- Project [cast(key#4162 as string) AS key#4176, cast(value#4163 as string) AS value#4177] +- Project [key#4213 AS key#4162, value#4214 AS value#4163, topic#4215 AS topic#4164, partition#4216 AS partition#4165, offset#4217L AS offset#4166L, timestamp#4218 AS timestamp#4167, timestampType#4219 AS timestampType#4168] +- LogicalRDD [key#4213, value#4214, topic#4215, partition#4216, offset#4217L, timestamp#4218, timestampType#4219], true == Optimized Logical Plan == SerializeFromObject [input[0, int, false] AS value#4186] +- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#4185: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2 +- Project [cast(key#4213 as string) AS key#4176, cast(value#4214 as string) AS value#4177] +- LogicalRDD [key#4213, value#4214, topic#4215, partition#4216, offset#4217L, timestamp#4218, timestampType#4219], true == Physical Plan == *(1) SerializeFromObject [input[0, int, false] AS value#4186] +- *(1) MapElements <function1>, obj#4185: int +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2 +- *(1) Project [cast(key#4213 as string) AS key#4176, cast(value#4214 as string) AS value#4177] +- Scan ExistingRDD kafka[key#4213,value#4214,topic#4215,partition#4216,offset#4217L,timestamp#4218,timestampType#4219]

sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 

== Results ==
!== Correct Answer - 3 ==   == Spark Answer - 0 ==
!struct<value:int>          struct<>
![0]                        
![1]                        
![2]                        
    

== Progress ==
   StartStream(ProcessingTime(100),org.apache.spark.sql.streaming.util.StreamManualClock@24dbef55,Map(),null)
   AssertOnQuery(<condition>, name)
   CheckAnswer: 
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: 
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
=> CheckNewAnswer: [0],[1],[2]
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [3],[4]
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: 
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: 
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [12],[13],[14]
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [15],[16]
   AssertOnQuery(<condition>, name)
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [18],[20]
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: [22],[23]
   AdvanceManualClock(100)
   AssertOnQuery(<condition>, name)
   CheckNewAnswer: 

== Stream ==
Output Mode: Append
Stream state: {KafkaSourceV1[Subscribe[topic-31]]: {"topic-31":{"0":0}}}
Thread state: alive
Thread stack trace: java.lang.Object.wait(Native Method)
org.apache.spark.util.ManualClock.waitTillTime(ManualClock.scala:61)
org.apache.spark.sql.streaming.util.StreamManualClock.waitTillTime(StreamManualClock.scala:34)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:65)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:157)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)


== Sink ==
0: 


== Plan ==
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#4186]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#4185: int
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2
      +- Project [cast(key#4162 as string) AS key#4176, cast(value#4163 as string) AS value#4177]
         +- Project [key#4213 AS key#4162, value#4214 AS value#4163, topic#4215 AS topic#4164, partition#4216 AS partition#4165, offset#4217L AS offset#4166L, timestamp#4218 AS timestamp#4167, timestampType#4219 AS timestampType#4168]
            +- LogicalRDD [key#4213, value#4214, topic#4215, partition#4216, offset#4217L, timestamp#4218, timestampType#4219], true

== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#4186]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#4185: int
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2
      +- Project [cast(key#4162 as string) AS key#4176, cast(value#4163 as string) AS value#4177]
         +- Project [key#4213 AS key#4162, value#4214 AS value#4163, topic#4215 AS topic#4164, partition#4216 AS partition#4165, offset#4217L AS offset#4166L, timestamp#4218 AS timestamp#4167, timestampType#4219 AS timestampType#4168]
            +- LogicalRDD [key#4213, value#4214, topic#4215, partition#4216, offset#4217L, timestamp#4218, timestampType#4219], true

== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#4186]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#4185: int
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2
      +- Project [cast(key#4213 as string) AS key#4176, cast(value#4214 as string) AS value#4177]
         +- LogicalRDD [key#4213, value#4214, topic#4215, partition#4216, offset#4217L, timestamp#4218, timestampType#4219], true

== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#4186]
+- *(1) MapElements <function1>, obj#4185: int
   +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2
      +- *(1) Project [cast(key#4213 as string) AS key#4176, cast(value#4214 as string) AS value#4177]
         +- Scan ExistingRDD kafka[key#4213,value#4214,topic#4215,partition#4216,offset#4217L,timestamp#4218,timestampType#4219]
         
         
	at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
	at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
	at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
	at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:450)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$20.apply(StreamTest.scala:751)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$20.apply(StreamTest.scala:751)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:750)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:770)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:757)
	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
	at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:757)
	at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:756)
	at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:49)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11$$anonfun$apply$mcV$sp$14.apply(KafkaMicroBatchSourceSuite.scala:657)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11$$anonfun$apply$mcV$sp$14.apply(KafkaMicroBatchSourceSuite.scala:656)
	at org.apache.spark.sql.kafka010.KafkaTestUtils.withTranscationalProducer(KafkaTestUtils.scala:357)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11.apply$mcV$sp(KafkaMicroBatchSourceSuite.scala:656)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11.apply(KafkaMicroBatchSourceSuite.scala:613)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11.apply(KafkaMicroBatchSourceSuite.scala:613)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
	at org.apache.spark.sql.kafka010.KafkaSourceTest.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaMicroBatchSourceSuite.scala:49)
	at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
	at org.apache.spark.sql.kafka010.KafkaSourceTest.runTest(KafkaMicroBatchSourceSuite.scala:49)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite$class.run(Suite.scala:1147)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)