org.scalatest.exceptions.TestFailedException: == Results == !== Correct Answer - 6 == == Spark Answer - 4 == !struct<_1:int,_2:int,_3:int> struct<key:int,value:int,value:int> [1,1,1] [1,1,1] ![1,1,6] [1,6,6] ![1,6,1] [2,2,2] ![1,6,6] [3,3,3] ![2,2,2] ![3,3,3] == Progress == AssertOnQuery(<condition>, ) AddKafkaData(topics = Set(topic-30), data = WrappedArray(1, 2), message = ) CheckAnswer: [1,1,1],[2,2,2] AddKafkaData(topics = Set(topic-30), data = WrappedArray(6, 3), message = ) => CheckAnswer: [1,1,1],[2,2,2],[3,3,3],[1,6,1],[1,1,6],[1,6,6] == Stream == Output Mode: Append Stream state: {Kafka[Subscribe[topic-30]]: {"topic-30":{"1":2,"0":2}}} Thread state: alive Thread stack trace: java.lang.Thread.sleep(Native Method) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163) org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131) org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) == Sink == 0: 1: [1,1,1] [2,2,2] 2: [1,6,6] 3: [3,3,3] == Plan == == Parsed Logical Plan == Project [key#6871, value#6870, value#6874] +- Join Inner, (key#6871 = key#6875) :- Project [cast(cast(value#6857 as string) as int) AS value#6870, (cast(cast(value#6857 as string) as int) % 5) AS key#6871] : +- Project [key#6979 AS key#6856, value#6980 AS value#6857, topic#6981 AS topic#6858, partition#6982 AS partition#6859, offset#6983L AS offset#6860L, timestamp#6984 AS timestamp#6861, timestampType#6985 AS timestampType#6862] : +- Streaming RelationV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031]) +- Project [cast(cast(value#6857 as string) as int) AS value#6874, (cast(cast(value#6857 as string) as int) % 5) AS key#6875] +- Project [key#6979 AS key#6856, value#6980 AS value#6857, topic#6981 AS topic#6858, partition#6982 AS partition#6859, offset#6983L AS offset#6860L, timestamp#6984 AS timestamp#6861, timestampType#6985 AS timestampType#6862] +- Streaming RelationV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031]) == Analyzed Logical Plan == key: int, value: int, value: int Project [key#6871, value#6870, value#6874] +- Join Inner, (key#6871 = key#6875) :- Project [cast(cast(value#6857 as string) as int) AS value#6870, (cast(cast(value#6857 as string) as int) % 5) AS key#6871] : +- Project [key#6979 AS key#6856, value#6980 AS value#6857, topic#6981 AS topic#6858, partition#6982 AS partition#6859, offset#6983L AS offset#6860L, timestamp#6984 AS timestamp#6861, timestampType#6985 AS timestampType#6862] : +- Streaming RelationV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031]) +- Project [cast(cast(value#6857 as string) as int) AS value#6874, (cast(cast(value#6857 as string) as int) % 5) AS key#6875] +- Project [key#6979 AS key#6856, value#6980 AS value#6857, topic#6981 AS topic#6858, partition#6982 AS partition#6859, offset#6983L AS offset#6860L, timestamp#6984 AS timestamp#6861, timestampType#6985 AS timestampType#6862] +- Streaming RelationV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031]) == Optimized Logical Plan == Project [key#6871, value#6870, value#6874] +- Join Inner, (key#6871 = key#6875) :- Project [cast(cast(value#6980 as string) as int) AS value#6870, (cast(cast(value#6980 as string) as int) % 5) AS key#6871] : +- Filter isnotnull((cast(cast(value#6980 as string) as int) % 5)) : +- Streaming RelationV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031]) +- Project [cast(cast(value#6980 as string) as int) AS value#6874, (cast(cast(value#6980 as string) as int) % 5) AS key#6875] +- Filter isnotnull((cast(cast(value#6980 as string) as int) % 5)) +- Streaming RelationV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031]) == Physical Plan == *(3) Project [key#6871, value#6870, value#6874] +- StreamingSymmetricHashJoin [key#6871], [key#6875], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6-ubuntu-test/target/tmp/streaming.metadata-2ffb4ea8-aab2-4161-94de-a462ad81a5cc/state, runId = a9eb05c5-e7c6-466e-9f05-73475cb7416a, opId = 0, ver = 3, numPartitions = 5], 0, state cleanup [ left = null, right = null ] :- *(1) Project [cast(cast(value#6980 as string) as int) AS value#6870, (cast(cast(value#6980 as string) as int) % 5) AS key#6871] : +- *(1) Filter isnotnull((cast(cast(value#6980 as string) as int) % 5)) : +- *(1) ScanV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031]) +- *(2) Project [cast(cast(value#6980 as string) as int) AS value#6874, (cast(cast(value#6980 as string) as int) % 5) AS key#6875] +- *(2) Filter isnotnull((cast(cast(value#6980 as string) as int) % 5)) +- *(2) ScanV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031])

sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 

== Results ==
!== Correct Answer - 6 ==       == Spark Answer - 4 ==
!struct<_1:int,_2:int,_3:int>   struct<key:int,value:int,value:int>
 [1,1,1]                        [1,1,1]
![1,1,6]                        [1,6,6]
![1,6,1]                        [2,2,2]
![1,6,6]                        [3,3,3]
![2,2,2]                        
![3,3,3]                        
    

== Progress ==
   AssertOnQuery(<condition>, )
   AddKafkaData(topics = Set(topic-30), data = WrappedArray(1, 2), message = )
   CheckAnswer: [1,1,1],[2,2,2]
   AddKafkaData(topics = Set(topic-30), data = WrappedArray(6, 3), message = )
=> CheckAnswer: [1,1,1],[2,2,2],[3,3,3],[1,6,1],[1,1,6],[1,6,6]

== Stream ==
Output Mode: Append
Stream state: {Kafka[Subscribe[topic-30]]: {"topic-30":{"1":2,"0":2}}}
Thread state: alive
Thread stack trace: java.lang.Thread.sleep(Native Method)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)


== Sink ==
0: 
1: [1,1,1] [2,2,2]
2: [1,6,6]
3: [3,3,3]


== Plan ==
== Parsed Logical Plan ==
Project [key#6871, value#6870, value#6874]
+- Join Inner, (key#6871 = key#6875)
   :- Project [cast(cast(value#6857 as string) as int) AS value#6870, (cast(cast(value#6857 as string) as int) % 5) AS key#6871]
   :  +- Project [key#6979 AS key#6856, value#6980 AS value#6857, topic#6981 AS topic#6858, partition#6982 AS partition#6859, offset#6983L AS offset#6860L, timestamp#6984 AS timestamp#6861, timestampType#6985 AS timestampType#6862]
   :     +- Streaming RelationV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031])
   +- Project [cast(cast(value#6857 as string) as int) AS value#6874, (cast(cast(value#6857 as string) as int) % 5) AS key#6875]
      +- Project [key#6979 AS key#6856, value#6980 AS value#6857, topic#6981 AS topic#6858, partition#6982 AS partition#6859, offset#6983L AS offset#6860L, timestamp#6984 AS timestamp#6861, timestampType#6985 AS timestampType#6862]
         +- Streaming RelationV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031])

== Analyzed Logical Plan ==
key: int, value: int, value: int
Project [key#6871, value#6870, value#6874]
+- Join Inner, (key#6871 = key#6875)
   :- Project [cast(cast(value#6857 as string) as int) AS value#6870, (cast(cast(value#6857 as string) as int) % 5) AS key#6871]
   :  +- Project [key#6979 AS key#6856, value#6980 AS value#6857, topic#6981 AS topic#6858, partition#6982 AS partition#6859, offset#6983L AS offset#6860L, timestamp#6984 AS timestamp#6861, timestampType#6985 AS timestampType#6862]
   :     +- Streaming RelationV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031])
   +- Project [cast(cast(value#6857 as string) as int) AS value#6874, (cast(cast(value#6857 as string) as int) % 5) AS key#6875]
      +- Project [key#6979 AS key#6856, value#6980 AS value#6857, topic#6981 AS topic#6858, partition#6982 AS partition#6859, offset#6983L AS offset#6860L, timestamp#6984 AS timestamp#6861, timestampType#6985 AS timestampType#6862]
         +- Streaming RelationV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031])

== Optimized Logical Plan ==
Project [key#6871, value#6870, value#6874]
+- Join Inner, (key#6871 = key#6875)
   :- Project [cast(cast(value#6980 as string) as int) AS value#6870, (cast(cast(value#6980 as string) as int) % 5) AS key#6871]
   :  +- Filter isnotnull((cast(cast(value#6980 as string) as int) % 5))
   :     +- Streaming RelationV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031])
   +- Project [cast(cast(value#6980 as string) as int) AS value#6874, (cast(cast(value#6980 as string) as int) % 5) AS key#6875]
      +- Filter isnotnull((cast(cast(value#6980 as string) as int) % 5))
         +- Streaming RelationV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031])

== Physical Plan ==
*(3) Project [key#6871, value#6870, value#6874]
+- StreamingSymmetricHashJoin [key#6871], [key#6875], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6-ubuntu-test/target/tmp/streaming.metadata-2ffb4ea8-aab2-4161-94de-a462ad81a5cc/state, runId = a9eb05c5-e7c6-466e-9f05-73475cb7416a, opId = 0, ver = 3, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
   :- *(1) Project [cast(cast(value#6980 as string) as int) AS value#6870, (cast(cast(value#6980 as string) as int) % 5) AS key#6871]
   :  +- *(1) Filter isnotnull((cast(cast(value#6980 as string) as int) % 5))
   :     +- *(1) ScanV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031])
   +- *(2) Project [cast(cast(value#6980 as string) as int) AS value#6874, (cast(cast(value#6980 as string) as int) % 5) AS key#6875]
      +- *(2) Filter isnotnull((cast(cast(value#6980 as string) as int) % 5))
         +- *(2) ScanV2 kafka[key#6979, value#6980, topic#6981, partition#6982, offset#6983L, timestamp#6984, timestampType#6985] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40031])
         
         
	at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
	at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
	at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
	at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:430)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$18.apply(StreamTest.scala:669)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$18.apply(StreamTest.scala:669)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:668)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:704)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:693)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:693)
	at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:692)
	at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:52)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$10.apply$mcV$sp(KafkaMicroBatchSourceSuite.scala:585)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$10.apply(KafkaMicroBatchSourceSuite.scala:566)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$10.apply(KafkaMicroBatchSourceSuite.scala:566)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
	at org.apache.spark.sql.kafka010.KafkaSourceTest.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaMicroBatchSourceSuite.scala:52)
	at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
	at org.apache.spark.sql.kafka010.KafkaSourceTest.runTest(KafkaMicroBatchSourceSuite.scala:52)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite$class.run(Suite.scala:1147)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)