org.scalatest.exceptions.TestFailedException: Error adding data: The code passed to eventually never returned normally. Attempted 308 times over 1.0025132691333334 minutes. Last failure message: assertion failed: /brokers/topics/stress4 still exists. org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421) org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439) org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479) org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308) org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479) org.apache.spark.sql.kafka010.KafkaTestUtils.verifyTopicDeletionWithRetries(KafkaTestUtils.scala:413) org.apache.spark.sql.kafka010.KafkaTestUtils.deleteTopic(KafkaTestUtils.scala:226) org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$21$$anonfun$apply$mcV$sp$36$$anonfun$56.apply(KafkaMicroBatchSourceSuite.scala:1468) org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$21$$anonfun$apply$mcV$sp$36$$anonfun$56.apply(KafkaMicroBatchSourceSuite.scala:1465) org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$2.apply(KafkaMicroBatchSourceSuite.scala:110) Caused by: assertion failed: /brokers/topics/stress4 still exists scala.Predef$.assert(Predef.scala:170) org.apache.spark.sql.kafka010.KafkaTestUtils.org$apache$spark$sql$kafka010$KafkaTestUtils$$verifyTopicDeletion(KafkaTestUtils.scala:385) org.apache.spark.sql.kafka010.KafkaTestUtils$$anonfun$verifyTopicDeletionWithRetries$1.apply$mcV$sp(KafkaTestUtils.scala:415) org.apache.spark.sql.kafka010.KafkaTestUtils$$anonfun$verifyTopicDeletionWithRetries$1.apply(KafkaTestUtils.scala:414) org.apache.spark.sql.kafka010.KafkaTestUtils$$anonfun$verifyTopicDeletionWithRetries$1.apply(KafkaTestUtils.scala:414) org.scalatest.concurrent.Eventually$class.makeAValiantAttempt$1(Eventually.scala:395) org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:409) org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439) org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479) org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308) == Progress == AssertOnQuery(<condition>, ) AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data = Range(0, 1, 2, 3), message = ) AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data = Range(4, 5, 6, 7, 8, 9, 10), message = Delete topic stress3) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(11, 12, 13, 14, 15), message = Delete topic stress2) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16] StopStream StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@6e4e3873,Map(),null) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(16, 17), message = ) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18] StopStream AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(18, 19, 20, 21, 22), message = Add partition) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(23, 24, 25, 26, 27, 28, 29), message = ) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(30, 31), message = ) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(), message = Add partition) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(32, 33, 34, 35, 36, 37, 38), message = ) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(39, 40, 41, 42, 43, 44, 45, 46), message = Add partition) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(47, 48, 49, 50), message = ) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(51, 52, 53, 54, 55, 56, 57, 58), message = ) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(59, 60), message = ) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(61, 62, 63, 64, 65), message = Add partition) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(66, 67, 68, 69), message = ) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(70, 71, 72, 73, 74, 75), message = ) StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@3604199d,Map(),null) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(76, 77, 78, 79, 80), message = Add partition) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(81, 82, 83), message = ) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(84, 85, 86, 87, 88, 89), message = ) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(90, 91, 92, 93, 94, 95, 96), message = Add partition) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(), message = Add partition) AddKafkaData(topics = Set(stress1, stress4, stress5, stress6), data = Range(97, 98, 99, 100, 101, 102, 103, 104), message = Add topic stress7) AddKafkaData(topics = Set(stress1, stress4, stress5, stress6), data = Range(105, 106, 107, 108, 109, 110, 111, 112), message = ) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(113, 114, 115, 116, 117, 118, 119), message = Delete topic stress6) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120] StopStream StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@6d9b9dcd,Map(),null) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(120, 121, 122, 123), message = Add partition) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(124, 125, 126, 127, 128), message = Delete topic stress1) AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(129, 130, 131), message = Add partition) AddKafkaData(topics = Set(stress1, stress4), data = Range(132, 133, 134, 135, 136, 137, 138, 139), message = Delete topic stress5) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130],[131],[132],[133],[134],[135],[136],[137],[138],[139],[140] AddKafkaData(topics = Set(stress1, stress4), data = Range(140, 141), message = ) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130],[131],[132],[133],[134],[135],[136],[137],[138],[139],[140],[141],[142] StopStream AddKafkaData(topics = Set(stress1, stress4), data = Range(142, 143, 144), message = Add partition) StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@76c35208,Map(),null) AddKafkaData(topics = Set(stress1, stress4), data = Range(145, 146, 147, 148, 149, 150, 151, 152, 153), message = Add partition) AddKafkaData(topics = Set(stress1, stress4), data = Range(154, 155, 156, 157, 158), message = ) AddKafkaData(topics = Set(stress1, stress4), data = Range(159, 160, 161, 162, 163), message = Delete topic stress1) AddKafkaData(topics = Set(stress1, stress4), data = Range(164, 165), message = ) AddKafkaData(topics = Set(stress1, stress4), data = Range(166, 167), message = Add partition) => AddKafkaData(topics = Set(stress1), data = Range(168), message = Delete topic stress4) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130],[131],[132],[133],[134],[135],[136],[137],[138],[139],[140],[141],[142],[143],[144],[145],[146],[147],[148],[149],[150],[151],[152],[153],[154],[155],[156],[157],[158],[159],[160],[161],[162],[163],[164],[165],[166],[167],[168],[169] StopStream AddKafkaData(topics = Set(stress1), data = Range(169, 170, 171, 172, 173), message = Add partition) AddKafkaData(topics = Set(stress1), data = Range(174, 175, 176, 177, 178, 179, 180, 181), message = ) StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@e850085,Map(),null) AddKafkaData(topics = Set(stress1), data = Range(182, 183), message = ) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130],[131],[132],[133],[134],[135],[136],[137],[138],[139],[140],[141],[142],[143],[144],[145],[146],[147],[148],[149],[150],[151],[152],[153],[154],[155],[156],[157],[158],[159],[160],[161],[162],[163],[164],[165],[166],[167],[168],[169],[170],[171],[172],[173],[174],[175],[176],[177],[178],[179],[180],[181],[182],[183],[184] == Stream == Output Mode: Append Stream state: {KafkaV2[SubscribePattern[stress.*]]: {"stress1":{"23":1,"32":1,"35":1,"8":3,"17":1,"26":1,"11":2,"29":0,"2":4,"20":0,"5":2,"14":2,"4":5,"13":4,"31":3,"22":2,"7":2,"16":2,"34":1,"25":2,"37":0,"10":2,"1":4,"28":0,"19":1,"36":1,"27":0,"9":4,"18":2,"12":5,"3":7,"21":1,"30":1,"15":2,"33":0,"6":3,"24":1,"0":6},"stress4":{"23":2,"50":1,"32":1,"41":1,"53":0,"17":1,"8":3,"35":1,"44":0,"26":0,"11":1,"29":0,"38":0,"47":1,"20":0,"2":0,"5":1,"14":1,"46":0,"49":0,"40":0,"13":1,"4":1,"22":1,"31":0,"16":1,"7":1,"52":1,"43":1,"25":0,"34":0,"10":0,"37":0,"1":0,"19":1,"28":0,"54":0,"45":0,"27":1,"36":0,"18":1,"9":1,"21":0,"48":0,"3":1,"12":1,"30":0,"39":0,"15":0,"42":0,"51":0,"24":0,"6":0,"33":0,"0":16}}} Thread state: alive Thread stack trace: sun.misc.Unsafe.park(Native Method) java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206) scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222) scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157) org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:243) org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:731) org.apache.spark.SparkContext.runJob(SparkContext.scala:2003) org.apache.spark.SparkContext.runJob(SparkContext.scala:2024) org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) org.apache.spark.SparkContext.runJob(SparkContext.scala:2068) org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:959) org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) org.apache.spark.rdd.RDD.withScope(RDD.scala:364) org.apache.spark.rdd.RDD.collect(RDD.scala:958) org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299) org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3374) org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783) org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783) org.apache.spark.sql.Dataset$$anonfun$withAction$1.apply(Dataset.scala:3363) org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:87) org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:147) org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74) org.apache.spark.sql.Dataset.withAction(Dataset.scala:3359) org.apache.spark.sql.Dataset.collect(Dataset.scala:2783) org.apache.spark.sql.execution.streaming.MemorySink.addBatch(memory.scala:277) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$18.apply(MicroBatchExecution.scala:535) org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:87) org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:147) org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:533) org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:323) org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:59) org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:532) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:195) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:163) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:163) org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:323) org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:59) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163) org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:157) org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:280) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:190) == Sink == 0: 1: [1] [2] [3] [4] 2: [7] [9] [6] [11] [8] [5] [10] 3: [12] [13] [14] [15] [16] 4: [18] [17] 5: [38] [46] [35] [42] [66] [48] [53] [43] [23] [24] [29] [32] [58] [61] [39] [47] [34] [41] [65] [19] [25] [30] [50] [55] [67] [63] [72] [76] [68] [71] [20] [26] [57] [74] [36] [44] [52] [22] [28] [31] [49] [54] [51] [56] [60] [70] [62] [75] [33] [40] [64] [59] [21] [27] [69] [73] [37] [45] 6: 7: [80] [81] [79] [78] 8: [77] 9: [82] [83] 10: [84] 11: [85] 12: [88] [89] [86] [87] [90] 13: 14: [93] [95] [92] [97] [94] [96] [91] 15: 16: 17: [107] [106] 18: [109] [108] [113] [98] [100] [102] [104] [110] [112] [99] [101] [103] [105] [111] 19: [116] [117] [119] [114] [118] [120] [115] 20: [123] [121] [124] [122] 21: [127] [129] [126] [128] [125] 22: [130] [132] [131] 23: 24: [133] [135] [137] [138] [139] [134] [140] [136] 25: [141] [142] 26: [143] [144] [145] 27: [147] [146] 28: [152] [150] [153] [154] [149] [151] [148] 29: 30: [159] [156] [157] [158] [155] 31: [160] [161] 32: [163] [162] [164] 33: [166] [165] 34: [168] == Plan == == Parsed Logical Plan == SerializeFromObject [input[0, int, false] AS value#8180] +- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#8179: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#8178: scala.Tuple2 +- Project [cast(key#8156 as string) AS key#8170, cast(value#8157 as string) AS value#8171] +- Project [key#8520 AS key#8156, value#8521 AS value#8157, topic#8522 AS topic#8158, partition#8523 AS partition#8159, offset#8524L AS offset#8160L, timestamp#8525 AS timestamp#8161, timestampType#8526 AS timestampType#8162] +- Streaming RelationV2 kafka[key#8520, value#8521, topic#8522, partition#8523, offset#8524L, timestamp#8525, timestampType#8526] (Options: [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:36417,kafka.d...) == Analyzed Logical Plan == value: int SerializeFromObject [input[0, int, false] AS value#8180] +- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#8179: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#8178: scala.Tuple2 +- Project [cast(key#8156 as string) AS key#8170, cast(value#8157 as string) AS value#8171] +- Project [key#8520 AS key#8156, value#8521 AS value#8157, topic#8522 AS topic#8158, partition#8523 AS partition#8159, offset#8524L AS offset#8160L, timestamp#8525 AS timestamp#8161, timestampType#8526 AS timestampType#8162] +- Streaming RelationV2 kafka[key#8520, value#8521, topic#8522, partition#8523, offset#8524L, timestamp#8525, timestampType#8526] (Options: [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:36417,kafka.d...) == Optimized Logical Plan == SerializeFromObject [input[0, int, false] AS value#8180] +- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#8179: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#8178: scala.Tuple2 +- Project [cast(key#8520 as string) AS key#8170, cast(value#8521 as string) AS value#8171] +- Streaming RelationV2 kafka[key#8520, value#8521, topic#8522, partition#8523, offset#8524L, timestamp#8525, timestampType#8526] (Options: [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:36417,kafka.d...) == Physical Plan == *(1) SerializeFromObject [input[0, int, false] AS value#8180] +- *(1) MapElements <function1>, obj#8179: int +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#8178: scala.Tuple2 +- *(1) Project [cast(key#8520 as string) AS key#8170, cast(value#8521 as string) AS value#8171] +- *(1) Project [key#8520, value#8521, topic#8522, partition#8523, offset#8524L, timestamp#8525, timestampType#8526] +- *(1) ScanV2 kafka[key#8520, value#8521, topic#8522, partition#8523, offset#8524L, timestamp#8525, timestampType#8526] (Options: [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:36417,kafka.d...)

sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 
Error adding data: The code passed to eventually never returned normally. Attempted 308 times over 1.0025132691333334 minutes. Last failure message: assertion failed: /brokers/topics/stress4 still exists.
org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:421)
	org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
	org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479)
	org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308)
	org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479)
	org.apache.spark.sql.kafka010.KafkaTestUtils.verifyTopicDeletionWithRetries(KafkaTestUtils.scala:413)
	org.apache.spark.sql.kafka010.KafkaTestUtils.deleteTopic(KafkaTestUtils.scala:226)
	org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$21$$anonfun$apply$mcV$sp$36$$anonfun$56.apply(KafkaMicroBatchSourceSuite.scala:1468)
	org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$21$$anonfun$apply$mcV$sp$36$$anonfun$56.apply(KafkaMicroBatchSourceSuite.scala:1465)
	org.apache.spark.sql.kafka010.KafkaSourceTest$AddKafkaData$$anonfun$addData$2.apply(KafkaMicroBatchSourceSuite.scala:110)

	Caused by: 	assertion failed: /brokers/topics/stress4 still exists
	scala.Predef$.assert(Predef.scala:170)
		org.apache.spark.sql.kafka010.KafkaTestUtils.org$apache$spark$sql$kafka010$KafkaTestUtils$$verifyTopicDeletion(KafkaTestUtils.scala:385)
		org.apache.spark.sql.kafka010.KafkaTestUtils$$anonfun$verifyTopicDeletionWithRetries$1.apply$mcV$sp(KafkaTestUtils.scala:415)
		org.apache.spark.sql.kafka010.KafkaTestUtils$$anonfun$verifyTopicDeletionWithRetries$1.apply(KafkaTestUtils.scala:414)
		org.apache.spark.sql.kafka010.KafkaTestUtils$$anonfun$verifyTopicDeletionWithRetries$1.apply(KafkaTestUtils.scala:414)
		org.scalatest.concurrent.Eventually$class.makeAValiantAttempt$1(Eventually.scala:395)
		org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:409)
		org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:439)
		org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479)
		org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:308)


== Progress ==
   AssertOnQuery(<condition>, )
   AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data = Range(0, 1, 2, 3), message = )
   AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data = Range(4, 5, 6, 7, 8, 9, 10), message = Delete topic stress3)
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(11, 12, 13, 14, 15), message = Delete topic stress2)
   CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16]
   StopStream
   StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@6e4e3873,Map(),null)
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(16, 17), message = )
   CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18]
   StopStream
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(18, 19, 20, 21, 22), message = Add partition)
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(23, 24, 25, 26, 27, 28, 29), message = )
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(30, 31), message = )
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(), message = Add partition)
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(32, 33, 34, 35, 36, 37, 38), message = )
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(39, 40, 41, 42, 43, 44, 45, 46), message = Add partition)
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(47, 48, 49, 50), message = )
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(51, 52, 53, 54, 55, 56, 57, 58), message = )
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(59, 60), message = )
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(61, 62, 63, 64, 65), message = Add partition)
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(66, 67, 68, 69), message = )
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(70, 71, 72, 73, 74, 75), message = )
   StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@3604199d,Map(),null)
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(76, 77, 78, 79, 80), message = Add partition)
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(81, 82, 83), message = )
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(84, 85, 86, 87, 88, 89), message = )
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(90, 91, 92, 93, 94, 95, 96), message = Add partition)
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(), message = Add partition)
   AddKafkaData(topics = Set(stress1, stress4, stress5, stress6), data = Range(97, 98, 99, 100, 101, 102, 103, 104), message = Add topic stress7)
   AddKafkaData(topics = Set(stress1, stress4, stress5, stress6), data = Range(105, 106, 107, 108, 109, 110, 111, 112), message = )
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(113, 114, 115, 116, 117, 118, 119), message = Delete topic stress6)
   CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120]
   StopStream
   StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@6d9b9dcd,Map(),null)
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(120, 121, 122, 123), message = Add partition)
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(124, 125, 126, 127, 128), message = Delete topic stress1)
   AddKafkaData(topics = Set(stress1, stress4, stress5), data = Range(129, 130, 131), message = Add partition)
   AddKafkaData(topics = Set(stress1, stress4), data = Range(132, 133, 134, 135, 136, 137, 138, 139), message = Delete topic stress5)
   CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130],[131],[132],[133],[134],[135],[136],[137],[138],[139],[140]
   AddKafkaData(topics = Set(stress1, stress4), data = Range(140, 141), message = )
   CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130],[131],[132],[133],[134],[135],[136],[137],[138],[139],[140],[141],[142]
   StopStream
   AddKafkaData(topics = Set(stress1, stress4), data = Range(142, 143, 144), message = Add partition)
   StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@76c35208,Map(),null)
   AddKafkaData(topics = Set(stress1, stress4), data = Range(145, 146, 147, 148, 149, 150, 151, 152, 153), message = Add partition)
   AddKafkaData(topics = Set(stress1, stress4), data = Range(154, 155, 156, 157, 158), message = )
   AddKafkaData(topics = Set(stress1, stress4), data = Range(159, 160, 161, 162, 163), message = Delete topic stress1)
   AddKafkaData(topics = Set(stress1, stress4), data = Range(164, 165), message = )
   AddKafkaData(topics = Set(stress1, stress4), data = Range(166, 167), message = Add partition)
=> AddKafkaData(topics = Set(stress1), data = Range(168), message = Delete topic stress4)
   CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130],[131],[132],[133],[134],[135],[136],[137],[138],[139],[140],[141],[142],[143],[144],[145],[146],[147],[148],[149],[150],[151],[152],[153],[154],[155],[156],[157],[158],[159],[160],[161],[162],[163],[164],[165],[166],[167],[168],[169]
   StopStream
   AddKafkaData(topics = Set(stress1), data = Range(169, 170, 171, 172, 173), message = Add partition)
   AddKafkaData(topics = Set(stress1), data = Range(174, 175, 176, 177, 178, 179, 180, 181), message = )
   StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@e850085,Map(),null)
   AddKafkaData(topics = Set(stress1), data = Range(182, 183), message = )
   CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130],[131],[132],[133],[134],[135],[136],[137],[138],[139],[140],[141],[142],[143],[144],[145],[146],[147],[148],[149],[150],[151],[152],[153],[154],[155],[156],[157],[158],[159],[160],[161],[162],[163],[164],[165],[166],[167],[168],[169],[170],[171],[172],[173],[174],[175],[176],[177],[178],[179],[180],[181],[182],[183],[184]

== Stream ==
Output Mode: Append
Stream state: {KafkaV2[SubscribePattern[stress.*]]: {"stress1":{"23":1,"32":1,"35":1,"8":3,"17":1,"26":1,"11":2,"29":0,"2":4,"20":0,"5":2,"14":2,"4":5,"13":4,"31":3,"22":2,"7":2,"16":2,"34":1,"25":2,"37":0,"10":2,"1":4,"28":0,"19":1,"36":1,"27":0,"9":4,"18":2,"12":5,"3":7,"21":1,"30":1,"15":2,"33":0,"6":3,"24":1,"0":6},"stress4":{"23":2,"50":1,"32":1,"41":1,"53":0,"17":1,"8":3,"35":1,"44":0,"26":0,"11":1,"29":0,"38":0,"47":1,"20":0,"2":0,"5":1,"14":1,"46":0,"49":0,"40":0,"13":1,"4":1,"22":1,"31":0,"16":1,"7":1,"52":1,"43":1,"25":0,"34":0,"10":0,"37":0,"1":0,"19":1,"28":0,"54":0,"45":0,"27":1,"36":0,"18":1,"9":1,"21":0,"48":0,"3":1,"12":1,"30":0,"39":0,"15":0,"42":0,"51":0,"24":0,"6":0,"33":0,"0":16}}}
Thread state: alive
Thread stack trace: sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997)
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:206)
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:157)
org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:243)
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:731)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2003)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2024)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2068)
org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:959)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:364)
org.apache.spark.rdd.RDD.collect(RDD.scala:958)
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:299)
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3374)
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783)
org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2783)
org.apache.spark.sql.Dataset$$anonfun$withAction$1.apply(Dataset.scala:3363)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:87)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:147)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:3359)
org.apache.spark.sql.Dataset.collect(Dataset.scala:2783)
org.apache.spark.sql.execution.streaming.MemorySink.addBatch(memory.scala:277)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$18.apply(MicroBatchExecution.scala:535)
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:87)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:147)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:533)
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:323)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:59)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:532)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:195)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:163)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:163)
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:323)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:59)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:157)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:280)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:190)


== Sink ==
0: 
1: [1] [2] [3] [4]
2: [7] [9] [6] [11] [8] [5] [10]
3: [12] [13] [14] [15] [16]
4: [18] [17]
5: [38] [46] [35] [42] [66] [48] [53] [43] [23] [24] [29] [32] [58] [61] [39] [47] [34] [41] [65] [19] [25] [30] [50] [55] [67] [63] [72] [76] [68] [71] [20] [26] [57] [74] [36] [44] [52] [22] [28] [31] [49] [54] [51] [56] [60] [70] [62] [75] [33] [40] [64] [59] [21] [27] [69] [73] [37] [45]
6: 
7: [80] [81] [79] [78]
8: [77]
9: [82] [83]
10: [84]
11: [85]
12: [88] [89] [86] [87] [90]
13: 
14: [93] [95] [92] [97] [94] [96] [91]
15: 
16: 
17: [107] [106]
18: [109] [108] [113] [98] [100] [102] [104] [110] [112] [99] [101] [103] [105] [111]
19: [116] [117] [119] [114] [118] [120] [115]
20: [123] [121] [124] [122]
21: [127] [129] [126] [128] [125]
22: [130] [132] [131]
23: 
24: [133] [135] [137] [138] [139] [134] [140] [136]
25: [141] [142]
26: [143] [144] [145]
27: [147] [146]
28: [152] [150] [153] [154] [149] [151] [148]
29: 
30: [159] [156] [157] [158] [155]
31: [160] [161]
32: [163] [162] [164]
33: [166] [165]
34: [168]


== Plan ==
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#8180]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#8179: int
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#8178: scala.Tuple2
      +- Project [cast(key#8156 as string) AS key#8170, cast(value#8157 as string) AS value#8171]
         +- Project [key#8520 AS key#8156, value#8521 AS value#8157, topic#8522 AS topic#8158, partition#8523 AS partition#8159, offset#8524L AS offset#8160L, timestamp#8525 AS timestamp#8161, timestampType#8526 AS timestampType#8162]
            +- Streaming RelationV2 kafka[key#8520, value#8521, topic#8522, partition#8523, offset#8524L, timestamp#8525, timestampType#8526] (Options: [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:36417,kafka.d...)

== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#8180]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#8179: int
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#8178: scala.Tuple2
      +- Project [cast(key#8156 as string) AS key#8170, cast(value#8157 as string) AS value#8171]
         +- Project [key#8520 AS key#8156, value#8521 AS value#8157, topic#8522 AS topic#8158, partition#8523 AS partition#8159, offset#8524L AS offset#8160L, timestamp#8525 AS timestamp#8161, timestampType#8526 AS timestampType#8162]
            +- Streaming RelationV2 kafka[key#8520, value#8521, topic#8522, partition#8523, offset#8524L, timestamp#8525, timestampType#8526] (Options: [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:36417,kafka.d...)

== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#8180]
+- MapElements <function1>, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#8179: int
   +- DeserializeToObject newInstance(class scala.Tuple2), obj#8178: scala.Tuple2
      +- Project [cast(key#8520 as string) AS key#8170, cast(value#8521 as string) AS value#8171]
         +- Streaming RelationV2 kafka[key#8520, value#8521, topic#8522, partition#8523, offset#8524L, timestamp#8525, timestampType#8526] (Options: [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:36417,kafka.d...)

== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#8180]
+- *(1) MapElements <function1>, obj#8179: int
   +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#8178: scala.Tuple2
      +- *(1) Project [cast(key#8520 as string) AS key#8170, cast(value#8521 as string) AS value#8171]
         +- *(1) Project [key#8520, value#8521, topic#8522, partition#8523, offset#8524L, timestamp#8525, timestampType#8526]
            +- *(1) ScanV2 kafka[key#8520, value#8521, topic#8522, partition#8523, offset#8524L, timestamp#8525, timestampType#8526] (Options: [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:36417,kafka.d...)
         
         
	at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
	at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
	at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
	at org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:453)
	at org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:719)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:773)
	at org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:760)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:760)
	at org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:759)
	at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:49)
	at org.apache.spark.sql.streaming.StreamTest$class.runStressTest(StreamTest.scala:871)
	at org.apache.spark.sql.kafka010.KafkaSourceTest.runStressTest(KafkaMicroBatchSourceSuite.scala:49)
	at org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$21.apply$mcV$sp(KafkaMicroBatchSourceSuite.scala:1444)
	at org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$21.apply(KafkaMicroBatchSourceSuite.scala:1423)
	at org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$anonfun$21.apply(KafkaMicroBatchSourceSuite.scala:1423)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
	at org.apache.spark.sql.kafka010.KafkaSourceTest.org$scalatest$BeforeAndAfterEach$$super$runTest(KafkaMicroBatchSourceSuite.scala:49)
	at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
	at org.apache.spark.sql.kafka010.KafkaSourceTest.runTest(KafkaMicroBatchSourceSuite.scala:49)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
	at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
	at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite$class.run(Suite.scala:1147)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
	at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
	at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)