org.scalatest.exceptions.TestFailedException: Timed out waiting for stream: The code passed to failAfter did not complete within 10 seconds. java.lang.Thread.getStackTrace(Thread.java:1559) org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:234) org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233) org.apache.spark.ml.feature.RFormulaSuite.failAfterImpl(RFormulaSuite.scala:28) org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230) org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229) org.apache.spark.ml.feature.RFormulaSuite.failAfter(RFormulaSuite.scala:28) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:472) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:471) scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) Caused by: null java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2173) org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:408) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:473) scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239) org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233) org.apache.spark.ml.feature.RFormulaSuite.failAfterImpl(RFormulaSuite.scala:28) org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230) == Progress == AddData to MemoryStream[_1#101184,_2#101185,_3#101186]: (1,foo,zq),(2,bar,zq),(3,bar,zy) => CheckAnswerByFunc == Stream == Output Mode: Append Stream state: {} Thread state: alive Thread stack trace: java.io.FileOutputStream.writeBytes(Native Method) java.io.FileOutputStream.write(FileOutputStream.java:326) sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221) sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291) sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295) sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141) java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229) org.apache.log4j.helpers.QuietWriter.flush(QuietWriter.java:59) org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:324) org.apache.log4j.WriterAppender.append(WriterAppender.java:162) org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) org.apache.log4j.Category.callAppenders(Category.java:206) org.apache.log4j.Category.forcedLog(Category.java:391) org.apache.log4j.Category.log(Category.java:856) org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:305) org.apache.spark.internal.Logging.logInfo(Logging.scala:58) org.apache.spark.internal.Logging.logInfo$(Logging.scala:57) org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.logInfo(CodeGenerator.scala:1205) org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1378) org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1370) com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599) com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379) com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342) com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257) com.google.common.cache.LocalCache.get(LocalCache.java:4000) com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004) com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874) org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1239) org.apache.spark.sql.execution.WholeStageCodegenExec.liftedTree1$1(WholeStageCodegenExec.scala:693) org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:692) org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131) org.apache.spark.sql.execution.SparkPlan$$Lambda$1756/1064636605.apply(Unknown Source) org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155) org.apache.spark.sql.execution.SparkPlan$$Lambda$1757/1292813939.apply(Unknown Source) org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296) org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3302) org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2708) org.apache.spark.sql.Dataset$$Lambda$2061/338928580.apply(Unknown Source) org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3291) org.apache.spark.sql.Dataset$$Lambda$2062/339627111.apply(Unknown Source) org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) org.apache.spark.sql.execution.SQLExecution$$$Lambda$2063/71148951.apply(Unknown Source) org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:147) org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74) org.apache.spark.sql.Dataset.withAction(Dataset.scala:3287) org.apache.spark.sql.Dataset.collect(Dataset.scala:2708) org.apache.spark.sql.execution.streaming.MemorySink.addBatch(memory.scala:280) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:547) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$2370/147230741.apply(Unknown Source) org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) org.apache.spark.sql.execution.SQLExecution$$$Lambda$2063/71148951.apply(Unknown Source) org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:147) org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:545) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$2369/1209156805.apply(Unknown Source) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:325) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:323) org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60) org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:545) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:203) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$2244/726337336.apply$mcV$sp(Unknown Source) scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:325) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:323) org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:171) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$2242/874718659.apply$mcZ$sp(Unknown Source) org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:165) org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:198) == Sink == == Plan == == Parsed Logical Plan == Project [id#101187, a#101188, b#101189, features#101255, label#101268] +- Project [id#101187, a#101188, b#101189, features#101255, cast(id#101187 as double) AS label#101268] +- Project [id#101187, a#101188, b#101189, features#101255] +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, stridx_0170b30612af#101222, onehot_cc558201b788#101230, onehot_76e1e95364aa#101231, features#101246 AS features#101255] +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, stridx_0170b30612af#101222, onehot_cc558201b788#101230, onehot_76e1e95364aa#101231, UDF(named_struct(onehot_cc558201b788, onehot_cc558201b788#101230, onehot_76e1e95364aa, onehot_76e1e95364aa#101231)) AS features#101246] +- Filter AtLeastNNulls(n, onehot_cc558201b788#101230,onehot_76e1e95364aa#101231) +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, stridx_0170b30612af#101222, if ((isnull(cast(stridx_5aa2e06b2f63#101213 as double)) || isnull(0))) null else UDF(cast(stridx_5aa2e06b2f63#101213 as double), 0) AS onehot_cc558201b788#101230, if ((isnull(cast(stridx_0170b30612af#101222 as double)) || isnull(1))) null else UDF(cast(stridx_0170b30612af#101222 as double), 1) AS onehot_76e1e95364aa#101231] +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, UDF(cast(b#101189 as string)) AS stridx_0170b30612af#101222] +- Filter UDF(b#101189) +- Filter AtLeastNNulls(n, b#101189) +- Project [id#101187, a#101188, b#101189, UDF(cast(a#101188 as string)) AS stridx_5aa2e06b2f63#101213] +- Filter UDF(a#101188) +- Filter AtLeastNNulls(n, a#101188) +- Project [id#101194 AS id#101187, a#101195 AS a#101188, b#101196 AS b#101189] +- Project [_1#101184 AS id#101194, _2#101185 AS a#101195, _3#101186 AS b#101196] +- Project [_1#101284 AS _1#101184, _2#101285 AS _2#101185, _3#101286 AS _3#101186] +- Streaming RelationV2 MemoryStreamDataSource$[_1#101284, _2#101285, _3#101286] == Analyzed Logical Plan == id: int, a: string, b: string, features: vector, label: double Project [id#101187, a#101188, b#101189, features#101255, label#101268] +- Project [id#101187, a#101188, b#101189, features#101255, cast(id#101187 as double) AS label#101268] +- Project [id#101187, a#101188, b#101189, features#101255] +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, stridx_0170b30612af#101222, onehot_cc558201b788#101230, onehot_76e1e95364aa#101231, features#101246 AS features#101255] +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, stridx_0170b30612af#101222, onehot_cc558201b788#101230, onehot_76e1e95364aa#101231, UDF(named_struct(onehot_cc558201b788, onehot_cc558201b788#101230, onehot_76e1e95364aa, onehot_76e1e95364aa#101231)) AS features#101246] +- Filter AtLeastNNulls(n, onehot_cc558201b788#101230,onehot_76e1e95364aa#101231) +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, stridx_0170b30612af#101222, if ((isnull(cast(stridx_5aa2e06b2f63#101213 as double)) || isnull(0))) null else UDF(cast(stridx_5aa2e06b2f63#101213 as double), 0) AS onehot_cc558201b788#101230, if ((isnull(cast(stridx_0170b30612af#101222 as double)) || isnull(1))) null else UDF(cast(stridx_0170b30612af#101222 as double), 1) AS onehot_76e1e95364aa#101231] +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, UDF(cast(b#101189 as string)) AS stridx_0170b30612af#101222] +- Filter UDF(b#101189) +- Filter AtLeastNNulls(n, b#101189) +- Project [id#101187, a#101188, b#101189, UDF(cast(a#101188 as string)) AS stridx_5aa2e06b2f63#101213] +- Filter UDF(a#101188) +- Filter AtLeastNNulls(n, a#101188) +- Project [id#101194 AS id#101187, a#101195 AS a#101188, b#101196 AS b#101189] +- Project [_1#101184 AS id#101194, _2#101185 AS a#101195, _3#101186 AS b#101196] +- Project [_1#101284 AS _1#101184, _2#101285 AS _2#101185, _3#101286 AS _3#101186] +- Streaming RelationV2 MemoryStreamDataSource$[_1#101284, _2#101285, _3#101286] == Optimized Logical Plan == Project [id#101187, a#101188, b#101189, features#101246 AS features#101255, cast(id#101187 as double) AS label#101268] +- Project [id#101187, a#101188, b#101189, UDF(named_struct(onehot_cc558201b788, UDF(stridx_5aa2e06b2f63#101213, 0), onehot_76e1e95364aa, UDF(stridx_0170b30612af#101222, 1))) AS features#101246] +- Filter AtLeastNNulls(n, UDF(stridx_5aa2e06b2f63#101213, 0),UDF(stridx_0170b30612af#101222, 1)) +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, UDF(b#101189) AS stridx_0170b30612af#101222] +- Filter (AtLeastNNulls(n, b#101189) && UDF(b#101189)) +- Project [_1#101284 AS id#101187, _2#101285 AS a#101188, _3#101286 AS b#101189, UDF(_2#101285) AS stridx_5aa2e06b2f63#101213] +- Filter (AtLeastNNulls(n, _2#101285) && UDF(_2#101285)) +- Streaming RelationV2 MemoryStreamDataSource$[_1#101284, _2#101285, _3#101286] == Physical Plan == *(1) Project [id#101187, a#101188, b#101189, features#101246 AS features#101255, cast(id#101187 as double) AS label#101268] +- *(1) Project [id#101187, a#101188, b#101189, UDF(named_struct(onehot_cc558201b788, UDF(stridx_5aa2e06b2f63#101213, 0), onehot_76e1e95364aa, UDF(stridx_0170b30612af#101222, 1))) AS features#101246] +- *(1) Filter AtLeastNNulls(n, UDF(stridx_5aa2e06b2f63#101213, 0),UDF(stridx_0170b30612af#101222, 1)) +- *(1) Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, UDF(b#101189) AS stridx_0170b30612af#101222] +- *(1) Filter (AtLeastNNulls(n, b#101189) && UDF(b#101189)) +- *(1) Project [_1#101284 AS id#101187, _2#101285 AS a#101188, _3#101286 AS b#101189, UDF(_2#101285) AS stridx_5aa2e06b2f63#101213] +- *(1) Filter (AtLeastNNulls(n, _2#101285) && UDF(_2#101285)) +- *(1) Project [_1#101284, _2#101285, _3#101286] +- *(1) ScanV2 MemoryStreamDataSource$[_1#101284, _2#101285, _3#101286]

sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: 
Timed out waiting for stream: The code passed to failAfter did not complete within 10 seconds.
java.lang.Thread.getStackTrace(Thread.java:1559)
	org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:234)
	org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
	org.apache.spark.ml.feature.RFormulaSuite.failAfterImpl(RFormulaSuite.scala:28)
	org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
	org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)
	org.apache.spark.ml.feature.RFormulaSuite.failAfter(RFormulaSuite.scala:28)
	org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:472)
	org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:471)
	scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)

	Caused by: 	null
	java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
		java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2173)
		org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:408)
		org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:473)
		scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
		org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
		org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239)
		org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
		org.apache.spark.ml.feature.RFormulaSuite.failAfterImpl(RFormulaSuite.scala:28)
		org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)


== Progress ==
   AddData to MemoryStream[_1#101184,_2#101185,_3#101186]: (1,foo,zq),(2,bar,zq),(3,bar,zy)
=> CheckAnswerByFunc

== Stream ==
Output Mode: Append
Stream state: {}
Thread state: alive
Thread stack trace: java.io.FileOutputStream.writeBytes(Native Method)
java.io.FileOutputStream.write(FileOutputStream.java:326)
sun.nio.cs.StreamEncoder.writeBytes(StreamEncoder.java:221)
sun.nio.cs.StreamEncoder.implFlushBuffer(StreamEncoder.java:291)
sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:295)
sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
org.apache.log4j.helpers.QuietWriter.flush(QuietWriter.java:59)
org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:324)
org.apache.log4j.WriterAppender.append(WriterAppender.java:162)
org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
org.apache.log4j.Category.callAppenders(Category.java:206)
org.apache.log4j.Category.forcedLog(Category.java:391)
org.apache.log4j.Category.log(Category.java:856)
org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:305)
org.apache.spark.internal.Logging.logInfo(Logging.scala:58)
org.apache.spark.internal.Logging.logInfo$(Logging.scala:57)
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.logInfo(CodeGenerator.scala:1205)
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1378)
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1370)
com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2257)
com.google.common.cache.LocalCache.get(LocalCache.java:4000)
com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4004)
com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1239)
org.apache.spark.sql.execution.WholeStageCodegenExec.liftedTree1$1(WholeStageCodegenExec.scala:693)
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:692)
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
org.apache.spark.sql.execution.SparkPlan$$Lambda$1756/1064636605.apply(Unknown Source)
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
org.apache.spark.sql.execution.SparkPlan$$Lambda$1757/1292813939.apply(Unknown Source)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296)
org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3302)
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2708)
org.apache.spark.sql.Dataset$$Lambda$2061/338928580.apply(Unknown Source)
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3291)
org.apache.spark.sql.Dataset$$Lambda$2062/339627111.apply(Unknown Source)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2063/71148951.apply(Unknown Source)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:147)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
org.apache.spark.sql.Dataset.withAction(Dataset.scala:3287)
org.apache.spark.sql.Dataset.collect(Dataset.scala:2708)
org.apache.spark.sql.execution.streaming.MemorySink.addBatch(memory.scala:280)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:547)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$2370/147230741.apply(Unknown Source)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
org.apache.spark.sql.execution.SQLExecution$$$Lambda$2063/71148951.apply(Unknown Source)
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:147)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:74)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:545)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$2369/1209156805.apply(Unknown Source)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:325)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:323)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:545)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:203)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$2244/726337336.apply$mcV$sp(Unknown Source)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:325)
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:323)
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:171)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$2242/874718659.apply$mcZ$sp(Unknown Source)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:165)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:286)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:198)


== Sink ==



== Plan ==
== Parsed Logical Plan ==
Project [id#101187, a#101188, b#101189, features#101255, label#101268]
+- Project [id#101187, a#101188, b#101189, features#101255, cast(id#101187 as double) AS label#101268]
   +- Project [id#101187, a#101188, b#101189, features#101255]
      +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, stridx_0170b30612af#101222, onehot_cc558201b788#101230, onehot_76e1e95364aa#101231, features#101246 AS features#101255]
         +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, stridx_0170b30612af#101222, onehot_cc558201b788#101230, onehot_76e1e95364aa#101231, UDF(named_struct(onehot_cc558201b788, onehot_cc558201b788#101230, onehot_76e1e95364aa, onehot_76e1e95364aa#101231)) AS features#101246]
            +- Filter AtLeastNNulls(n, onehot_cc558201b788#101230,onehot_76e1e95364aa#101231)
               +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, stridx_0170b30612af#101222, if ((isnull(cast(stridx_5aa2e06b2f63#101213 as double)) || isnull(0))) null else UDF(cast(stridx_5aa2e06b2f63#101213 as double), 0) AS onehot_cc558201b788#101230, if ((isnull(cast(stridx_0170b30612af#101222 as double)) || isnull(1))) null else UDF(cast(stridx_0170b30612af#101222 as double), 1) AS onehot_76e1e95364aa#101231]
                  +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, UDF(cast(b#101189 as string)) AS stridx_0170b30612af#101222]
                     +- Filter UDF(b#101189)
                        +- Filter AtLeastNNulls(n, b#101189)
                           +- Project [id#101187, a#101188, b#101189, UDF(cast(a#101188 as string)) AS stridx_5aa2e06b2f63#101213]
                              +- Filter UDF(a#101188)
                                 +- Filter AtLeastNNulls(n, a#101188)
                                    +- Project [id#101194 AS id#101187, a#101195 AS a#101188, b#101196 AS b#101189]
                                       +- Project [_1#101184 AS id#101194, _2#101185 AS a#101195, _3#101186 AS b#101196]
                                          +- Project [_1#101284 AS _1#101184, _2#101285 AS _2#101185, _3#101286 AS _3#101186]
                                             +- Streaming RelationV2 MemoryStreamDataSource$[_1#101284, _2#101285, _3#101286]

== Analyzed Logical Plan ==
id: int, a: string, b: string, features: vector, label: double
Project [id#101187, a#101188, b#101189, features#101255, label#101268]
+- Project [id#101187, a#101188, b#101189, features#101255, cast(id#101187 as double) AS label#101268]
   +- Project [id#101187, a#101188, b#101189, features#101255]
      +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, stridx_0170b30612af#101222, onehot_cc558201b788#101230, onehot_76e1e95364aa#101231, features#101246 AS features#101255]
         +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, stridx_0170b30612af#101222, onehot_cc558201b788#101230, onehot_76e1e95364aa#101231, UDF(named_struct(onehot_cc558201b788, onehot_cc558201b788#101230, onehot_76e1e95364aa, onehot_76e1e95364aa#101231)) AS features#101246]
            +- Filter AtLeastNNulls(n, onehot_cc558201b788#101230,onehot_76e1e95364aa#101231)
               +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, stridx_0170b30612af#101222, if ((isnull(cast(stridx_5aa2e06b2f63#101213 as double)) || isnull(0))) null else UDF(cast(stridx_5aa2e06b2f63#101213 as double), 0) AS onehot_cc558201b788#101230, if ((isnull(cast(stridx_0170b30612af#101222 as double)) || isnull(1))) null else UDF(cast(stridx_0170b30612af#101222 as double), 1) AS onehot_76e1e95364aa#101231]
                  +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, UDF(cast(b#101189 as string)) AS stridx_0170b30612af#101222]
                     +- Filter UDF(b#101189)
                        +- Filter AtLeastNNulls(n, b#101189)
                           +- Project [id#101187, a#101188, b#101189, UDF(cast(a#101188 as string)) AS stridx_5aa2e06b2f63#101213]
                              +- Filter UDF(a#101188)
                                 +- Filter AtLeastNNulls(n, a#101188)
                                    +- Project [id#101194 AS id#101187, a#101195 AS a#101188, b#101196 AS b#101189]
                                       +- Project [_1#101184 AS id#101194, _2#101185 AS a#101195, _3#101186 AS b#101196]
                                          +- Project [_1#101284 AS _1#101184, _2#101285 AS _2#101185, _3#101286 AS _3#101186]
                                             +- Streaming RelationV2 MemoryStreamDataSource$[_1#101284, _2#101285, _3#101286]

== Optimized Logical Plan ==
Project [id#101187, a#101188, b#101189, features#101246 AS features#101255, cast(id#101187 as double) AS label#101268]
+- Project [id#101187, a#101188, b#101189, UDF(named_struct(onehot_cc558201b788, UDF(stridx_5aa2e06b2f63#101213, 0), onehot_76e1e95364aa, UDF(stridx_0170b30612af#101222, 1))) AS features#101246]
   +- Filter AtLeastNNulls(n, UDF(stridx_5aa2e06b2f63#101213, 0),UDF(stridx_0170b30612af#101222, 1))
      +- Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, UDF(b#101189) AS stridx_0170b30612af#101222]
         +- Filter (AtLeastNNulls(n, b#101189) && UDF(b#101189))
            +- Project [_1#101284 AS id#101187, _2#101285 AS a#101188, _3#101286 AS b#101189, UDF(_2#101285) AS stridx_5aa2e06b2f63#101213]
               +- Filter (AtLeastNNulls(n, _2#101285) && UDF(_2#101285))
                  +- Streaming RelationV2 MemoryStreamDataSource$[_1#101284, _2#101285, _3#101286]

== Physical Plan ==
*(1) Project [id#101187, a#101188, b#101189, features#101246 AS features#101255, cast(id#101187 as double) AS label#101268]
+- *(1) Project [id#101187, a#101188, b#101189, UDF(named_struct(onehot_cc558201b788, UDF(stridx_5aa2e06b2f63#101213, 0), onehot_76e1e95364aa, UDF(stridx_0170b30612af#101222, 1))) AS features#101246]
   +- *(1) Filter AtLeastNNulls(n, UDF(stridx_5aa2e06b2f63#101213, 0),UDF(stridx_0170b30612af#101222, 1))
      +- *(1) Project [id#101187, a#101188, b#101189, stridx_5aa2e06b2f63#101213, UDF(b#101189) AS stridx_0170b30612af#101222]
         +- *(1) Filter (AtLeastNNulls(n, b#101189) && UDF(b#101189))
            +- *(1) Project [_1#101284 AS id#101187, _2#101285 AS a#101188, _3#101286 AS b#101189, UDF(_2#101285) AS stridx_5aa2e06b2f63#101213]
               +- *(1) Filter (AtLeastNNulls(n, _2#101285) && UDF(_2#101285))
                  +- *(1) Project [_1#101284, _2#101285, _3#101286]
                     +- *(1) ScanV2 MemoryStreamDataSource$[_1#101284, _2#101285, _3#101286]

         
         
	at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528)
	at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:527)
	at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
	at org.scalatest.Assertions.fail(Assertions.scala:1089)
	at org.scalatest.Assertions.fail$(Assertions.scala:1085)
	at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
	at org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:453)
	at org.apache.spark.sql.streaming.StreamTest.liftedTree1$1(StreamTest.scala:783)
	at org.apache.spark.sql.streaming.StreamTest.testStream(StreamTest.scala:759)
	at org.apache.spark.sql.streaming.StreamTest.testStream$(StreamTest.scala:329)
	at org.apache.spark.ml.feature.RFormulaSuite.testStream(RFormulaSuite.scala:28)
	at org.apache.spark.ml.util.MLTest.testTransformerOnStreamData(MLTest.scala:85)
	at org.apache.spark.ml.util.MLTest.testTransformerOnStreamData$(MLTest.scala:66)
	at org.apache.spark.ml.feature.RFormulaSuite.testTransformerOnStreamData(RFormulaSuite.scala:28)
	at org.apache.spark.ml.util.MLTest.testTransformerByGlobalCheckFunc(MLTest.scala:120)
	at org.apache.spark.ml.util.MLTest.testTransformerByGlobalCheckFunc$(MLTest.scala:113)
	at org.apache.spark.ml.feature.RFormulaSuite.testTransformerByGlobalCheckFunc(RFormulaSuite.scala:28)
	at org.apache.spark.ml.feature.RFormulaSuite.testRFormulaTransform(RFormulaSuite.scala:42)
	at org.apache.spark.ml.feature.RFormulaSuite.$anonfun$new$34(RFormulaSuite.scala:538)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:104)
	at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
	at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
	at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
	at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
	at org.apache.spark.ml.feature.RFormulaSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(RFormulaSuite.scala:28)
	at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
	at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
	at org.apache.spark.ml.feature.RFormulaSuite.runTest(RFormulaSuite.scala:28)
	at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
	at scala.collection.immutable.List.foreach(List.scala:392)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
	at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
	at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
	at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
	at org.scalatest.Suite.run(Suite.scala:1147)
	at org.scalatest.Suite.run$(Suite.scala:1129)
	at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
	at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
	at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
	at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
	at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:53)
	at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
	at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
	at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
	at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:53)
	at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
	at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:507)
	at sbt.ForkMain$Run$2.call(ForkMain.java:296)
	at sbt.ForkMain$Run$2.call(ForkMain.java:286)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)