org.scalatest.exceptions.TestFailedException: == Results == !== Correct Answer - 3 == == Spark Answer - 0 == !struct<value:int> struct<> ![0] ![1] ![2] == Progress == StartStream(ProcessingTime(100),org.apache.spark.sql.streaming.util.StreamManualClock@24dbef55,Map(),null) AssertOnQuery(<condition>, name) CheckAnswer: AssertOnQuery(<condition>, name) AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: AssertOnQuery(<condition>, name) AdvanceManualClock(100) AssertOnQuery(<condition>, name) => CheckNewAnswer: [0],[1],[2] AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: [3],[4] AssertOnQuery(<condition>, name) AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: AssertOnQuery(<condition>, name) AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: [12],[13],[14] AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: [15],[16] AssertOnQuery(<condition>, name) AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: [18],[20] AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: [22],[23] AdvanceManualClock(100) AssertOnQuery(<condition>, name) CheckNewAnswer: == Stream == Output Mode: Append Stream state: {KafkaSourceV1[Subscribe[topic-31]]: {"topic-31":{"0":0}}} Thread state: alive Thread stack trace: java.lang.Object.wait(Native Method) org.apache.spark.util.ManualClock.waitTillTime(ManualClock.scala:61) org.apache.spark.sql.streaming.util.StreamManualClock.waitTillTime(StreamManualClock.scala:34) org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:65) org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:157) org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) == Sink == 0: == Plan == == Parsed Logical Plan == SerializeFromObject [input[0, int, false] AS value#4186] +- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#4185: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2 +- Project [cast(key#4162 as string) AS key#4176, cast(value#4163 as string) AS value#4177] +- Project [key#4213 AS key#4162, value#4214 AS value#4163, topic#4215 AS topic#4164, partition#4216 AS partition#4165, offset#4217L AS offset#4166L, timestamp#4218 AS timestamp#4167, timestampType#4219 AS timestampType#4168] +- LogicalRDD [key#4213, value#4214, topic#4215, partition#4216, offset#4217L, timestamp#4218, timestampType#4219], true == Analyzed Logical Plan == value: int SerializeFromObject [input[0, int, false] AS value#4186] +- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#4185: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2 +- Project [cast(key#4162 as string) AS key#4176, cast(value#4163 as string) AS value#4177] +- Project [key#4213 AS key#4162, value#4214 AS value#4163, topic#4215 AS topic#4164, partition#4216 AS partition#4165, offset#4217L AS offset#4166L, timestamp#4218 AS timestamp#4167, timestampType#4219 AS timestampType#4168] +- LogicalRDD [key#4213, value#4214, topic#4215, partition#4216, offset#4217L, timestamp#4218, timestampType#4219], true == Optimized Logical Plan == SerializeFromObject [input[0, int, false] AS value#4186] +- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#4185: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2 +- Project [cast(key#4213 as string) AS key#4176, cast(value#4214 as string) AS value#4177] +- LogicalRDD [key#4213, value#4214, topic#4215, partition#4216, offset#4217L, timestamp#4218, timestampType#4219], true == Physical Plan == *(1) SerializeFromObject [input[0, int, false] AS value#4186] +- *(1) MapElements <function1>, obj#4185: int +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2 +- *(1) Project [cast(key#4213 as string) AS key#4176, cast(value#4214 as string) AS value#4177] +- Scan ExistingRDD kafka[key#4213,value#4214,topic#4215,partition#4216,offset#4217L,timestamp#4218,timestampType#4219]
sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException:
== Results ==
!== Correct Answer - 3 == == Spark Answer - 0 ==
!struct<value:int> struct<>
![0]
![1]
![2]
== Progress ==
StartStream(ProcessingTime(100),org.apache.spark.sql.streaming.util.StreamManualClock@24dbef55,Map(),null)
AssertOnQuery(<condition>, name)
CheckAnswer:
AssertOnQuery(<condition>, name)
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer:
AssertOnQuery(<condition>, name)
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
=> CheckNewAnswer: [0],[1],[2]
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer: [3],[4]
AssertOnQuery(<condition>, name)
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer:
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer:
AssertOnQuery(<condition>, name)
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer: [12],[13],[14]
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer: [15],[16]
AssertOnQuery(<condition>, name)
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer: [18],[20]
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer: [22],[23]
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer:
== Stream ==
Output Mode: Append
Stream state: {KafkaSourceV1[Subscribe[topic-31]]: {"topic-31":{"0":0}}}
Thread state: alive
Thread stack trace: java.lang.Object.wait(Native Method)
org.apache.spark.util.ManualClock.waitTillTime(ManualClock.scala:61)
org.apache.spark.sql.streaming.util.StreamManualClock.waitTillTime(StreamManualClock.scala:34)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:65)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:157)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
== Sink ==
0:
== Plan ==
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#4186]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#4185: int
+- DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2
+- Project [cast(key#4162 as string) AS key#4176, cast(value#4163 as string) AS value#4177]
+- Project [key#4213 AS key#4162, value#4214 AS value#4163, topic#4215 AS topic#4164, partition#4216 AS partition#4165, offset#4217L AS offset#4166L, timestamp#4218 AS timestamp#4167, timestampType#4219 AS timestampType#4168]
+- LogicalRDD [key#4213, value#4214, topic#4215, partition#4216, offset#4217L, timestamp#4218, timestampType#4219], true
== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#4186]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#4185: int
+- DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2
+- Project [cast(key#4162 as string) AS key#4176, cast(value#4163 as string) AS value#4177]
+- Project [key#4213 AS key#4162, value#4214 AS value#4163, topic#4215 AS topic#4164, partition#4216 AS partition#4165, offset#4217L AS offset#4166L, timestamp#4218 AS timestamp#4167, timestampType#4219 AS timestampType#4168]
+- LogicalRDD [key#4213, value#4214, topic#4215, partition#4216, offset#4217L, timestamp#4218, timestampType#4219], true
== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#4186]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#4185: int
+- DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2
+- Project [cast(key#4213 as string) AS key#4176, cast(value#4214 as string) AS value#4177]
+- LogicalRDD [key#4213, value#4214, topic#4215, partition#4216, offset#4217L, timestamp#4218, timestampType#4219], true
== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#4186]
+- *(1) MapElements <function1>, obj#4185: int
+- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#4184: scala.Tuple2
+- *(1) Project [cast(key#4213 as string) AS key#4176, cast(value#4214 as string) AS value#4177]
+- Scan ExistingRDD kafka[key#4213,value#4214,topic#4215,partition#4216,offset#4217L,timestamp#4218,timestampType#4219]
at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:450)
at org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$20.apply(StreamTest.scala:751)
at org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$20.apply(StreamTest.scala:751)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:750)
at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:770)
at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:757)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:757)
at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:756)
at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:49)
at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11$$anonfun$apply$mcV$sp$14.apply(KafkaMicroBatchSourceSuite.scala:657)
at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11$$anonfun$apply$mcV$sp$14.apply(KafkaMicroBatchSourceSuite.scala:656)
at org.apache.spark.sql.kafka010.KafkaTestUtils.withTranscationalProducer(KafkaTestUtils.scala:357)
at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11.apply$mcV$sp(KafkaMicroBatchSourceSuite.scala:656)
at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11.apply(KafkaMicroBatchSourceSuite.scala:613)
at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$11.apply(KafkaMicroBatchSourceSuite.scala:613)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
at org.apache.spark.sql.kafka010.KafkaSourceTest.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaMicroBatchSourceSuite.scala:49)
at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
at org.apache.spark.sql.kafka010.KafkaSourceTest.runTest(KafkaMicroBatchSourceSuite.scala:49)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
at sbt.ForkMain$Run$2.call(ForkMain.java:296)
at sbt.ForkMain$Run$2.call(ForkMain.java:286)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)