org.scalatest.exceptions.TestFailedException: Timed out waiting for stream: The code passed to failAfter did not complete within 30 seconds. java.lang.Thread.getStackTrace(Thread.java:1552) org.scalatest.concurrent.TimeLimits$class.failAfterImpl(TimeLimits.scala:234) org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:52) org.scalatest.concurrent.TimeLimits$class.failAfter(TimeLimits.scala:230) org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:52) org.apache.spark.sql.streaming.StreamTest$$anonfun$fetchStreamAnswer$1$5.apply(StreamTest.scala:467) org.apache.spark.sql.streaming.StreamTest$$anonfun$fetchStreamAnswer$1$5.apply(StreamTest.scala:466) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130) scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236) Caused by: null java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2151) org.apache.spark.sql.execution.streaming.StreamExecution.processAllAvailable(StreamExecution.scala:451) org.apache.spark.sql.streaming.StreamTest$$anonfun$fetchStreamAnswer$1$5$$anonfun$apply$3.apply$mcV$sp(StreamTest.scala:471) org.apache.spark.sql.streaming.StreamTest$$anonfun$fetchStreamAnswer$1$5$$anonfun$apply$3.apply(StreamTest.scala:467) org.apache.spark.sql.streaming.StreamTest$$anonfun$fetchStreamAnswer$1$5$$anonfun$apply$3.apply(StreamTest.scala:467) org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) org.scalatest.concurrent.TimeLimits$class.failAfterImpl(TimeLimits.scala:235) org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:52) org.scalatest.concurrent.TimeLimits$class.failAfter(TimeLimits.scala:230) org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:52) == Progress == AssertOnQuery(<condition>, ) AddKafkaData(topics = Set(topic-30), data = WrappedArray(1, 2), message = ) CheckAnswer: [1,1,1],[2,2,2] AddKafkaData(topics = Set(topic-30), data = WrappedArray(6, 3), message = ) => CheckAnswer: [1,1,1],[2,2,2],[3,3,3],[1,6,1],[1,1,6],[1,6,6] AssertOnQuery(<condition>, ) == Stream == Output Mode: Append Stream state: {KafkaV2[Subscribe[topic-30]]: {"topic-30":{"1":2,"0":2}}} Thread state: alive Thread stack trace: java.lang.Thread.sleep(Native Method) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:212) org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:161) org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) == Sink == 0: 1: [1,1,1] 2: [2,2,2] 3: [1,6,1] [1,1,6] [1,6,6] 4: [3,3,3] == Plan == == Parsed Logical Plan == Project [key#10023, value#10022, value#10026] +- Join Inner, (key#10023 = key#10027) :- Project [cast(cast(value#10009 as string) as int) AS value#10022, (cast(cast(value#10009 as string) as int) % 5) AS key#10023] : +- Project [key#10178 AS key#10008, value#10179 AS value#10009, topic#10180 AS topic#10010, partition#10181 AS partition#10011, offset#10182L AS offset#10012L, timestamp#10183 AS timestamp#10013, timestampType#10184 AS timestampType#10014] : +- Streaming RelationV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999]) +- Project [cast(cast(value#10009 as string) as int) AS value#10026, (cast(cast(value#10009 as string) as int) % 5) AS key#10027] +- Project [key#10178 AS key#10008, value#10179 AS value#10009, topic#10180 AS topic#10010, partition#10181 AS partition#10011, offset#10182L AS offset#10012L, timestamp#10183 AS timestamp#10013, timestampType#10184 AS timestampType#10014] +- Streaming RelationV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999]) == Analyzed Logical Plan == key: int, value: int, value: int Project [key#10023, value#10022, value#10026] +- Join Inner, (key#10023 = key#10027) :- Project [cast(cast(value#10009 as string) as int) AS value#10022, (cast(cast(value#10009 as string) as int) % 5) AS key#10023] : +- Project [key#10178 AS key#10008, value#10179 AS value#10009, topic#10180 AS topic#10010, partition#10181 AS partition#10011, offset#10182L AS offset#10012L, timestamp#10183 AS timestamp#10013, timestampType#10184 AS timestampType#10014] : +- Streaming RelationV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999]) +- Project [cast(cast(value#10009 as string) as int) AS value#10026, (cast(cast(value#10009 as string) as int) % 5) AS key#10027] +- Project [key#10178 AS key#10008, value#10179 AS value#10009, topic#10180 AS topic#10010, partition#10181 AS partition#10011, offset#10182L AS offset#10012L, timestamp#10183 AS timestamp#10013, timestampType#10184 AS timestampType#10014] +- Streaming RelationV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999]) == Optimized Logical Plan == Project [key#10023, value#10022, value#10026] +- Join Inner, (key#10023 = key#10027) :- Project [cast(cast(value#10179 as string) as int) AS value#10022, (cast(cast(value#10179 as string) as int) % 5) AS key#10023] : +- Filter isnotnull((cast(cast(value#10179 as string) as int) % 5)) : +- Streaming RelationV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999]) +- Project [cast(cast(value#10179 as string) as int) AS value#10026, (cast(cast(value#10179 as string) as int) % 5) AS key#10027] +- Filter isnotnull((cast(cast(value#10179 as string) as int) % 5)) +- Streaming RelationV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999]) == Physical Plan == *(3) Project [key#10023, value#10022, value#10026] +- StreamingSymmetricHashJoin [key#10023], [key#10027], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6/target/tmp/streaming.metadata-39583ee1-c102-44d5-95f0-a609a42df18a/state, runId = 241ad294-5c84-4a91-9c0d-c747ca862bc4, opId = 0, ver = 4, numPartitions = 5], 0, state cleanup [ left = null, right = null ] :- Exchange hashpartitioning(key#10023, 5) : +- *(1) Project [cast(cast(value#10179 as string) as int) AS value#10022, (cast(cast(value#10179 as string) as int) % 5) AS key#10023] : +- *(1) Filter isnotnull((cast(cast(value#10179 as string) as int) % 5)) : +- *(1) ScanV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999]) +- ReusedExchange [value#10026, key#10027], Exchange hashpartitioning(key#10023, 5)

sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 
Timed out waiting for stream: The code passed to failAfter did not complete within 30 seconds.
java.lang.Thread.getStackTrace(Thread.java:1552)
	org.scalatest.concurrent.TimeLimits$class.failAfterImpl(TimeLimits.scala:234)
	org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:52)
	org.scalatest.concurrent.TimeLimits$class.failAfter(TimeLimits.scala:230)
	org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:52)
	org.apache.spark.sql.streaming.StreamTest$$anonfun$fetchStreamAnswer$1$5.apply(StreamTest.scala:467)
	org.apache.spark.sql.streaming.StreamTest$$anonfun$fetchStreamAnswer$1$5.apply(StreamTest.scala:466)
	scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
	scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:130)
	scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:236)

	Caused by: 	null
	java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2151)
		org.apache.spark.sql.execution.streaming.StreamExecution.processAllAvailable(StreamExecution.scala:451)
		org.apache.spark.sql.streaming.StreamTest$$anonfun$fetchStreamAnswer$1$5$$anonfun$apply$3.apply$mcV$sp(StreamTest.scala:471)
		org.apache.spark.sql.streaming.StreamTest$$anonfun$fetchStreamAnswer$1$5$$anonfun$apply$3.apply(StreamTest.scala:467)
		org.apache.spark.sql.streaming.StreamTest$$anonfun$fetchStreamAnswer$1$5$$anonfun$apply$3.apply(StreamTest.scala:467)
		org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
		org.scalatest.concurrent.TimeLimits$class.failAfterImpl(TimeLimits.scala:235)
		org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:52)
		org.scalatest.concurrent.TimeLimits$class.failAfter(TimeLimits.scala:230)
		org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:52)


== Progress ==
   AssertOnQuery(<condition>, )
   AddKafkaData(topics = Set(topic-30), data = WrappedArray(1, 2), message = )
   CheckAnswer: [1,1,1],[2,2,2]
   AddKafkaData(topics = Set(topic-30), data = WrappedArray(6, 3), message = )
=> CheckAnswer: [1,1,1],[2,2,2],[3,3,3],[1,6,1],[1,1,6],[1,6,6]
   AssertOnQuery(<condition>, )

== Stream ==
Output Mode: Append
Stream state: {KafkaV2[Subscribe[topic-30]]: {"topic-30":{"1":2,"0":2}}}
Thread state: alive
Thread stack trace: java.lang.Thread.sleep(Native Method)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:212)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:161)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)


== Sink ==
0: 
1: [1,1,1]
2: [2,2,2]
3: [1,6,1] [1,1,6] [1,6,6]
4: [3,3,3]


== Plan ==
== Parsed Logical Plan ==
Project [key#10023, value#10022, value#10026]
+- Join Inner, (key#10023 = key#10027)
   :- Project [cast(cast(value#10009 as string) as int) AS value#10022, (cast(cast(value#10009 as string) as int) % 5) AS key#10023]
   :  +- Project [key#10178 AS key#10008, value#10179 AS value#10009, topic#10180 AS topic#10010, partition#10181 AS partition#10011, offset#10182L AS offset#10012L, timestamp#10183 AS timestamp#10013, timestampType#10184 AS timestampType#10014]
   :     +- Streaming RelationV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999])
   +- Project [cast(cast(value#10009 as string) as int) AS value#10026, (cast(cast(value#10009 as string) as int) % 5) AS key#10027]
      +- Project [key#10178 AS key#10008, value#10179 AS value#10009, topic#10180 AS topic#10010, partition#10181 AS partition#10011, offset#10182L AS offset#10012L, timestamp#10183 AS timestamp#10013, timestampType#10184 AS timestampType#10014]
         +- Streaming RelationV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999])

== Analyzed Logical Plan ==
key: int, value: int, value: int
Project [key#10023, value#10022, value#10026]
+- Join Inner, (key#10023 = key#10027)
   :- Project [cast(cast(value#10009 as string) as int) AS value#10022, (cast(cast(value#10009 as string) as int) % 5) AS key#10023]
   :  +- Project [key#10178 AS key#10008, value#10179 AS value#10009, topic#10180 AS topic#10010, partition#10181 AS partition#10011, offset#10182L AS offset#10012L, timestamp#10183 AS timestamp#10013, timestampType#10184 AS timestampType#10014]
   :     +- Streaming RelationV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999])
   +- Project [cast(cast(value#10009 as string) as int) AS value#10026, (cast(cast(value#10009 as string) as int) % 5) AS key#10027]
      +- Project [key#10178 AS key#10008, value#10179 AS value#10009, topic#10180 AS topic#10010, partition#10181 AS partition#10011, offset#10182L AS offset#10012L, timestamp#10183 AS timestamp#10013, timestampType#10184 AS timestampType#10014]
         +- Streaming RelationV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999])

== Optimized Logical Plan ==
Project [key#10023, value#10022, value#10026]
+- Join Inner, (key#10023 = key#10027)
   :- Project [cast(cast(value#10179 as string) as int) AS value#10022, (cast(cast(value#10179 as string) as int) % 5) AS key#10023]
   :  +- Filter isnotnull((cast(cast(value#10179 as string) as int) % 5))
   :     +- Streaming RelationV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999])
   +- Project [cast(cast(value#10179 as string) as int) AS value#10026, (cast(cast(value#10179 as string) as int) % 5) AS key#10027]
      +- Filter isnotnull((cast(cast(value#10179 as string) as int) % 5))
         +- Streaming RelationV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999])

== Physical Plan ==
*(3) Project [key#10023, value#10022, value#10026]
+- StreamingSymmetricHashJoin [key#10023], [key#10027], Inner, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = file:/home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6/target/tmp/streaming.metadata-39583ee1-c102-44d5-95f0-a609a42df18a/state, runId = 241ad294-5c84-4a91-9c0d-c747ca862bc4, opId = 0, ver = 4, numPartitions = 5], 0, state cleanup [ left = null, right = null ]
   :- Exchange hashpartitioning(key#10023, 5)
   :  +- *(1) Project [cast(cast(value#10179 as string) as int) AS value#10022, (cast(cast(value#10179 as string) as int) % 5) AS key#10023]
   :     +- *(1) Filter isnotnull((cast(cast(value#10179 as string) as int) % 5))
   :        +- *(1) ScanV2 kafka[key#10178, value#10179, topic#10180, partition#10181, offset#10182L, timestamp#10183, timestampType#10184] (Options: [subscribe=topic-30,kafka.metadata.max.age.ms=1,kafka.bootstrap.servers=localhost:40999])
   +- ReusedExchange [value#10026, key#10027], Exchange hashpartitioning(key#10023, 5)
         
         
	at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
	at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
	at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
	at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:448)
	at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:775)
	at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:751)
	at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:52)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$10.apply$mcV$sp(KafkaMicroBatchSourceSuite.scala:585)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$10.apply(KafkaMicroBatchSourceSuite.scala:566)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$10.apply(KafkaMicroBatchSourceSuite.scala:566)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
	at org.apache.spark.sql.kafka010.KafkaSourceTest.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaMicroBatchSourceSuite.scala:52)
	at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
	at org.apache.spark.sql.kafka010.KafkaSourceTest.runTest(KafkaMicroBatchSourceSuite.scala:52)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite$class.run(Suite.scala:1147)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)