Assert on query failed: name: The code passed to eventually never returned normally. Attempted 1306 times over 1.0633471959666667 minutes. Last failure message: clock.isStreamWaitingAt(clock.getTimeMillis()) was false.
org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421)
 org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
 org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)
 org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:337)
 org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)
 org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$46.apply(KafkaMicroBatchSourceSuite.scala:782)
 org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$46.apply(KafkaMicroBatchSourceSuite.scala:781)
 org.apache.spark.sql.streaming.StreamTest$Execute$$anonfun$apply$10.apply(StreamTest.scala:297)
 org.apache.spark.sql.streaming.StreamTest$Execute$$anonfun$apply$10.apply(StreamTest.scala:297)
 org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$11.apply$mcZ$sp(StreamTest.scala:651)

 Caused by: clock.isStreamWaitingAt(clock.getTimeMillis()) was false
 org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
 org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
 org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
 org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$46$$anonfun$apply$30.apply(KafkaMicroBatchSourceSuite.scala:784)
 org.scalatest.concurrent.Eventually$class.makeAValiantAttempt$1(Eventually.scala:395)
 org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:409)
 org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
 org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)
 org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:337)
 org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)


== Progress ==
 StartStream(ProcessingTime(100),org.apache.spark.sql.streaming.util.StreamManualClock@66b8f244,Map(),null)
 AssertOnQuery(<condition>, name)
 CheckAnswer: 
 AssertOnQuery(<condition>, name)
 AdvanceManualClock(100)
 AssertOnQuery(<condition>, name)
 CheckNewAnswer: 
 AssertOnQuery(<condition>, name)
 AdvanceManualClock(100)
 AssertOnQuery(<condition>, name)
 CheckNewAnswer: [0],[1],[2]
 AdvanceManualClock(100)
 AssertOnQuery(<condition>, name)
 CheckNewAnswer: [3],[4]
 AssertOnQuery(<condition>, name)
 AdvanceManualClock(100)
 AssertOnQuery(<condition>, name)
 CheckNewAnswer: 
 AdvanceManualClock(100)
=> AssertOnQuery(<condition>, name)
 CheckNewAnswer: 
 AssertOnQuery(<condition>, name)
 AdvanceManualClock(100)
 AssertOnQuery(<condition>, name)
 CheckNewAnswer: [12],[13],[14]
 AdvanceManualClock(100)
 AssertOnQuery(<condition>, name)
 CheckNewAnswer: [15],[16]
 AssertOnQuery(<condition>, name)
 AdvanceManualClock(100)
 AssertOnQuery(<condition>, name)
 CheckNewAnswer: [18],[20]
 AdvanceManualClock(100)
 AssertOnQuery(<condition>, name)
 CheckNewAnswer: [22],[23]
 AdvanceManualClock(100)
 AssertOnQuery(<condition>, name)
 CheckNewAnswer: 

== Stream ==
Output Mode: Append
Stream state: {KafkaV2[Subscribe[topic-34]]: {"topic-34":{"0":9}}}
Thread state: alive
Thread stack trace: com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:280)
com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:35)
com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:10)
com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:530)
com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)
com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)
com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013)
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3004)
org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:86)
org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)
org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)
scala.Option.map(Option.scala:146)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:137)
org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1747)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2015)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2053)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2078)
org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:961)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:366)
org.apache.spark.rdd.RDD.collect(RDD.scala:960)
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3371)
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2720)
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2720)
org.apache.spark.sql.Dataset$$anonfun$withAction$1.apply(Dataset.scala:3360)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:100)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:3356)
org.apache.spark.sql.Dataset.collect(Dataset.scala:2720)
org.apache.spark.sql.execution.streaming.MemorySink.addBatch(memory.scala:324)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12$$anonfun$apply$18.apply(MicroBatchExecution.scala:552)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12$$anonfun$apply$18.apply(MicroBatchExecution.scala:550)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:100)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12.apply(MicroBatchExecution.scala:550)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12.apply(MicroBatchExecution.scala:550)
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:327)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:549)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:210)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:178)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:178)
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:327)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:178)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:172)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:331)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:243)


== Sink ==
0: 
1: [0] [1] [2]
2: [3] [4]
3: 


== Plan ==
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2541]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#2540: int
 +- DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2
 +- Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]
 +- StreamingDataSourceV2Relation [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ae6193f, KafkaV2[Subscribe[topic-34]], {"topic-34":{"0":9}}, {"topic-34":{"0":12}}

== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#2541]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#2540: int
 +- DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2
 +- Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]
 +- StreamingDataSourceV2Relation [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ae6193f, KafkaV2[Subscribe[topic-34]], {"topic-34":{"0":9}}, {"topic-34":{"0":12}}

== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2541]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#2540: int
 +- DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2
 +- Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]
 +- StreamingDataSourceV2Relation [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ae6193f, KafkaV2[Subscribe[topic-34]], {"topic-34":{"0":9}}, {"topic-34":{"0":12}}

== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2541]
+- *(1) MapElements <function1>, obj#2540: int
 +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2
 +- *(1) Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]
 +- *(1) Project [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521]
 +- *(1) MicroBatchScan[key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan

 

org.scalatest.exceptions.TestFailedException:
Assert on query failed: name: The code passed to eventually never returned normally. Attempted 1306 times over 1.0633471959666667 minutes. Last failure message: clock.isStreamWaitingAt(clock.getTimeMillis()) was false.
org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421)
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:337)
org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$46.apply(KafkaMicroBatchSourceSuite.scala:782)
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$46.apply(KafkaMicroBatchSourceSuite.scala:781)
org.apache.spark.sql.streaming.StreamTest$Execute$$anonfun$apply$10.apply(StreamTest.scala:297)
org.apache.spark.sql.streaming.StreamTest$Execute$$anonfun$apply$10.apply(StreamTest.scala:297)
org.apache.spark.sql.streaming.StreamTest$$anonfun$executeAction$1$11.apply$mcZ$sp(StreamTest.scala:651)
Caused by: clock.isStreamWaitingAt(clock.getTimeMillis()) was false
org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:501)
org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$46$$anonfun$apply$30.apply(KafkaMicroBatchSourceSuite.scala:784)
org.scalatest.concurrent.Eventually$class.makeAValiantAttempt$1(Eventually.scala:395)
org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:409)
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:337)
org.apache.spark.sql.kafka010.KafkaSourceTest.eventually(KafkaMicroBatchSourceSuite.scala:49)
== Progress ==
StartStream(ProcessingTime(100),org.apache.spark.sql.streaming.util.StreamManualClock@66b8f244,Map(),null)
AssertOnQuery(<condition>, name)
CheckAnswer:
AssertOnQuery(<condition>, name)
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer:
AssertOnQuery(<condition>, name)
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer: [0],[1],[2]
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer: [3],[4]
AssertOnQuery(<condition>, name)
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer:
AdvanceManualClock(100)
=> AssertOnQuery(<condition>, name)
CheckNewAnswer:
AssertOnQuery(<condition>, name)
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer: [12],[13],[14]
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer: [15],[16]
AssertOnQuery(<condition>, name)
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer: [18],[20]
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer: [22],[23]
AdvanceManualClock(100)
AssertOnQuery(<condition>, name)
CheckNewAnswer:
== Stream ==
Output Mode: Append
Stream state: {KafkaV2[Subscribe[topic-34]]: {"topic-34":{"0":9}}}
Thread state: alive
Thread stack trace: com.fasterxml.jackson.core.json.ReaderBasedJsonParser.getText(ReaderBasedJsonParser.java:280)
com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:35)
com.fasterxml.jackson.databind.deser.std.StringDeserializer.deserialize(StringDeserializer.java:10)
com.fasterxml.jackson.databind.deser.SettableBeanProperty.deserialize(SettableBeanProperty.java:530)
com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeWithErrorWrapping(BeanDeserializer.java:528)
com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:417)
com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013)
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3004)
org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:86)
org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)
org.apache.spark.rdd.RDDOperationScope$$anonfun$5.apply(RDDOperationScope.scala:137)
scala.Option.map(Option.scala:146)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:137)
org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1747)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2015)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2053)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2078)
org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:961)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:366)
org.apache.spark.rdd.RDD.collect(RDD.scala:960)
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3371)
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2720)
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2720)
org.apache.spark.sql.Dataset$$anonfun$withAction$1.apply(Dataset.scala:3360)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:100)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:3356)
org.apache.spark.sql.Dataset.collect(Dataset.scala:2720)
org.apache.spark.sql.execution.streaming.MemorySink.addBatch(memory.scala:324)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12$$anonfun$apply$18.apply(MicroBatchExecution.scala:552)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12$$anonfun$apply$18.apply(MicroBatchExecution.scala:550)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:100)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12.apply(MicroBatchExecution.scala:550)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$12.apply(MicroBatchExecution.scala:550)
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:327)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:549)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:210)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:178)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:178)
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:327)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:67)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:178)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:172)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:331)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:243)
== Sink ==
0:
1: [0] [1] [2]
2: [3] [4]
3:
== Plan ==
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2541]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#2540: int
+- DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2
+- Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]
+- StreamingDataSourceV2Relation [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ae6193f, KafkaV2[Subscribe[topic-34]], {"topic-34":{"0":9}}, {"topic-34":{"0":12}}
== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#2541]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#2540: int
+- DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2
+- Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]
+- StreamingDataSourceV2Relation [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ae6193f, KafkaV2[Subscribe[topic-34]], {"topic-34":{"0":9}}, {"topic-34":{"0":12}}
== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#2541]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#2540: int
+- DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2
+- Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]
+- StreamingDataSourceV2Relation [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@6ae6193f, KafkaV2[Subscribe[topic-34]], {"topic-34":{"0":9}}, {"topic-34":{"0":12}}
== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#2541]
+- *(1) MapElements <function1>, obj#2540: int
+- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#2539: scala.Tuple2
+- *(1) Project [cast(key#2515 as string) AS key#2529, cast(value#2516 as string) AS value#2530]
+- *(1) Project [key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521]
+- *(1) MicroBatchScan[key#2515, value#2516, topic#2517, partition#2518, offset#2519L, timestamp#2520, timestampType#2521] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan
at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:452)
at org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:654)
at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:778)
at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:765)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:765)
at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:764)
at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:49)
at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$apply$mcV$sp$25.apply(KafkaMicroBatchSourceSuite.scala:795)
at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14$$anonfun$apply$mcV$sp$25.apply(KafkaMicroBatchSourceSuite.scala:794)
at org.apache.spark.sql.kafka010.KafkaTestUtils.withTranscationalProducer(KafkaTestUtils.scala:361)
at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14.apply$mcV$sp(KafkaMicroBatchSourceSuite.scala:794)
at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14.apply(KafkaMicroBatchSourceSuite.scala:750)
at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anonfun$14.apply(KafkaMicroBatchSourceSuite.scala:750)
at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104)
at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
at org.apache.spark.sql.kafka010.KafkaSourceTest.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaMicroBatchSourceSuite.scala:49)
at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
at org.apache.spark.sql.kafka010.KafkaSourceTest.runTest(KafkaMicroBatchSourceSuite.scala:49)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
at org.scalatest.Suite$class.run(Suite.scala:1147)
at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:53)
at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:53)
at org.scalatest.Suite$class.callExecuteOnSuite$1(Suite.scala:1210)
at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1257)
at org.scalatest.Suite$$anonfun$runNestedSuites$1.apply(Suite.scala:1255)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.scalatest.Suite$class.runNestedSuites(Suite.scala:1255)
at org.scalatest.tools.DiscoverySuite.runNestedSuites(DiscoverySuite.scala:30)
at org.scalatest.Suite$class.run(Suite.scala:1144)
at org.scalatest.tools.DiscoverySuite.run(DiscoverySuite.scala:30)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1346)
at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)
at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.main(Runner.scala:827)
at org.scalatest.tools.Runner.main(Runner.scala)