Timed out waiting for stream: The code passed to failAfter did not complete within 60 seconds.
java.base/java.lang.Thread.getStackTrace(Thread.java:1606)
 org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:234)
 org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
 org.apache.spark.sql.streaming.StreamingOuterJoinSuite.failAfterImpl(StreamingJoinSuite.scala:424)
 org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
 org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)
 org.apache.spark.sql.streaming.StreamingOuterJoinSuite.failAfter(StreamingJoinSuite.scala:424)
 org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:463)
 org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:462)
 scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)

 Caused by: null
 java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056)
 java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2221)
 org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:457)
 org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:464)
 scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
 org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
 org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239)
 org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
 org.apache.spark.sql.streaming.StreamingOuterJoinSuite.failAfterImpl(StreamingJoinSuite.scala:424)
 org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)


== Progress ==
 [ AddData to MemoryStream[value#726280]: 1,2,3 | AddData to MemoryStream[value#726289]: 1,2,3 ]
 CheckNewAnswer: [1,10,2,null],[2,10,4,null],[3,10,6,null]
 AssertOnQuery(<condition>, Check total state rows = List(3), updated state rows = List(3))
 [ AddData to MemoryStream[value#726280]: 20 | AddData to MemoryStream[value#726289]: 21 ]
 CheckNewAnswer: 
 AssertOnQuery(<condition>, Check total state rows = List(2), updated state rows = List(2))
 AddData to MemoryStream[value#726289]: 20
 CheckNewAnswer: [20,30,40,60]
 AssertOnQuery(<condition>, Check total state rows = List(3), updated state rows = List(1))
 [ AddData to MemoryStream[value#726280]: 40,41 | AddData to MemoryStream[value#726289]: 40,41 ]
 CheckNewAnswer: [40,50,80,120],[41,50,82,123]
 AssertOnQuery(<condition>, Check total state rows = List(4), updated state rows = List(4))
 [ AddData to MemoryStream[value#726280]: 70 | AddData to MemoryStream[value#726289]: 71 ]
 CheckNewAnswer: 
 AssertOnQuery(<condition>, Check total state rows = List(2), updated state rows = List(2))
 AddData to MemoryStream[value#726289]: 70
 CheckNewAnswer: [70,80,140,210]
 AssertOnQuery(<condition>, Check total state rows = List(3), updated state rows = List(1))
 [ AddData to MemoryStream[value#726280]: 101,102,103 | AddData to MemoryStream[value#726289]: 101,102,103 ]
=> CheckNewAnswer: 
 AssertOnQuery(<condition>, Check total state rows = List(6), updated state rows = List(3))
 [ AddData to MemoryStream[value#726280]: 1000 | AddData to MemoryStream[value#726289]: 1001 ]
 CheckNewAnswer: [101,110,202,null],[102,110,204,null],[103,110,206,null]
 AssertOnQuery(<condition>, Check total state rows = List(2), updated state rows = List(2))

== Stream ==
Output Mode: Append
Stream state: {MemoryStream[value#726289]: 5,MemoryStream[value#726280]: 3}
Thread state: alive
Thread stack trace: java.base@11.0.1/jdk.internal.misc.Unsafe.park(Native Method)
java.base@11.0.1/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1039)
java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345)
app//scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
app//scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
app//scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
app//org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:242)
app//org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:736)
app//org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
app//org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.doWrite(WriteToDataSourceV2Exec.scala:235)
app//org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.doWrite$(WriteToDataSourceV2Exec.scala:217)
app//org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doWrite(WriteToDataSourceV2Exec.scala:180)
app//org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:187)
app//org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
app//org.apache.spark.sql.execution.SparkPlan$$Lambda$5798/0x0000000801853040.apply(Unknown Source)
app//org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
app//org.apache.spark.sql.execution.SparkPlan$$Lambda$5799/0x0000000801853440.apply(Unknown Source)
app//org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
app//org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
app//org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
app//org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:320)
app//org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:369)
app//org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3411)
app//org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2768)
app//org.apache.spark.sql.Dataset$$Lambda$6229/0x00000008019c6040.apply(Unknown Source)
app//org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3401)
app//org.apache.spark.sql.Dataset$$Lambda$5492/0x0000000801729c40.apply(Unknown Source)
app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$5496/0x000000080172c840.apply(Unknown Source)
app//org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
app//org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
app//org.apache.spark.sql.Dataset.withAction(Dataset.scala:3397)
app//org.apache.spark.sql.Dataset.collect(Dataset.scala:2768)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:557)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$10067/0x0000000802821c40.apply(Unknown Source)
app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$5496/0x000000080172c840.apply(Unknown Source)
app//org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
app//org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:552)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$10066/0x0000000802821840.apply(Unknown Source)
app//org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:328)
app//org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:326)
app//org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:552)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:213)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$9991/0x00000008027fa840.apply$mcV$sp(Unknown Source)
app//scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
app//org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:328)
app//org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:326)
app//org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:181)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$9985/0x00000008027f8440.apply$mcZ$sp(Unknown Source)
app//org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:175)
app//org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332)
app//org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)


== Sink ==
0: [2,10,4,null] [3,10,6,null] [1,10,2,null]
1: 
2: 
3: [20,30,40,60]
4: [41,50,82,123] [40,50,80,120]
5: 
6: 
7: 
8: [70,80,140,210]


== Plan ==
== Parsed Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@46594d79
+- Project [key#726282, cast(window#726297-T10000ms.end as bigint) AS end#726326L, leftValue#726284, rightValue#726293]
 +- Join LeftOuter, ((((key#726282 = key#726291) AND (window#726297-T10000ms = window#726302-T10000ms)) AND (leftValue#726284 > 10)) AND ((rightValue#726293 < 300) OR (rightValue#726293 > 1000)))
 :- Project [key#726282, window#726298-T10000ms AS window#726297-T10000ms, leftValue#726284]
 : +- Filter isnotnull(leftTime#726283-T10000ms)
 : +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0) + 10000000), LongType, TimestampType)) AS window#726298-T10000ms, key#726282, leftTime#726283-T10000ms, leftValue#726284]
 : +- EventTimeWatermark leftTime#726283: timestamp, interval 10 seconds
 : +- Project [value#726280 AS key#726282, cast(value#726280 as timestamp) AS leftTime#726283, (value#726280 * 2) AS leftValue#726284]
 : +- StreamingDataSourceV2Relation [value#726280], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@5cf1c91, MemoryStream[value#726280], 3, 4
 +- Project [key#726291, window#726303-T10000ms AS window#726302-T10000ms, rightValue#726293]
 +- Filter isnotnull(rightTime#726292-T10000ms)
 +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0) + 10000000), LongType, TimestampType)) AS window#726303-T10000ms, key#726291, rightTime#726292-T10000ms, rightValue#726293]
 +- EventTimeWatermark rightTime#726292: timestamp, interval 10 seconds
 +- Project [value#726289 AS key#726291, cast(value#726289 as timestamp) AS rightTime#726292, (value#726289 * 3) AS rightValue#726293]
 +- StreamingDataSourceV2Relation [value#726289], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@29f58ed5, MemoryStream[value#726289], 5, 6

== Analyzed Logical Plan ==

WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@46594d79
+- Project [key#726282, cast(window#726297-T10000ms.end as bigint) AS end#726326L, leftValue#726284, rightValue#726293]
 +- Join LeftOuter, ((((key#726282 = key#726291) AND (window#726297-T10000ms = window#726302-T10000ms)) AND (leftValue#726284 > 10)) AND ((rightValue#726293 < 300) OR (rightValue#726293 > 1000)))
 :- Project [key#726282, window#726298-T10000ms AS window#726297-T10000ms, leftValue#726284]
 : +- Filter isnotnull(leftTime#726283-T10000ms)
 : +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0) + 10000000), LongType, TimestampType)) AS window#726298-T10000ms, key#726282, leftTime#726283-T10000ms, leftValue#726284]
 : +- EventTimeWatermark leftTime#726283: timestamp, interval 10 seconds
 : +- Project [value#726280 AS key#726282, cast(value#726280 as timestamp) AS leftTime#726283, (value#726280 * 2) AS leftValue#726284]
 : +- StreamingDataSourceV2Relation [value#726280], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@5cf1c91, MemoryStream[value#726280], 3, 4
 +- Project [key#726291, window#726303-T10000ms AS window#726302-T10000ms, rightValue#726293]
 +- Filter isnotnull(rightTime#726292-T10000ms)
 +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0) + 10000000), LongType, TimestampType)) AS window#726303-T10000ms, key#726291, rightTime#726292-T10000ms, rightValue#726293]
 +- EventTimeWatermark rightTime#726292: timestamp, interval 10 seconds
 +- Project [value#726289 AS key#726291, cast(value#726289 as timestamp) AS rightTime#726292, (value#726289 * 3) AS rightValue#726293]
 +- StreamingDataSourceV2Relation [value#726289], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@29f58ed5, MemoryStream[value#726289], 5, 6

== Optimized Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@46594d79
+- Project [key#726282, cast(window#726297-T10000ms.end as bigint) AS end#726326L, leftValue#726284, rightValue#726293]
 +- Join LeftOuter, (((leftValue#726284 > 10) AND (key#726282 = key#726291)) AND (window#726297-T10000ms = window#726302-T10000ms))
 :- Project [key#726282, named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#726297-T10000ms, leftValue#726284]
 : +- EventTimeWatermark leftTime#726283: timestamp, interval 10 seconds
 : +- Project [value#726280 AS key#726282, cast(value#726280 as timestamp) AS leftTime#726283, (value#726280 * 2) AS leftValue#726284]
 : +- StreamingDataSourceV2Relation [value#726280], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@5cf1c91, MemoryStream[value#726280], 3, 4
 +- Project [key#726291, named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#726302-T10000ms, rightValue#726293]
 +- EventTimeWatermark rightTime#726292: timestamp, interval 10 seconds
 +- Project [value#726289 AS key#726291, cast(value#726289 as timestamp) AS rightTime#726292, (value#726289 * 3) AS rightValue#726293]
 +- Filter (((value#726289 * 3) < 300) OR ((value#726289 * 3) > 1000))
 +- StreamingDataSourceV2Relation [value#726289], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@29f58ed5, MemoryStream[value#726289], 5, 6

== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@46594d79
+- *(5) Project [key#726282, cast(window#726297-T10000ms.end as bigint) AS end#726326L, leftValue#726284, rightValue#726293]
 +- StreamingSymmetricHashJoin [key#726282, window#726297-T10000ms], [key#726291, window#726302-T10000ms], LeftOuter, condition = [ leftOnly = (leftValue#726284 > 10), rightOnly = null, both = null, full = (leftValue#726284 > 10) ], state info [ checkpoint = file:/home/jenkins/workspace/spark-master-test-maven-hadoop-3.2-jdk-11/sql/core/target/tmp/streaming.metadata-c792c067-8711-4554-8e06-1586240169fe/state, runId = f9b87d2b-6ebb-48d9-b207-76e9cececbfc, opId = 0, ver = 9, numPartitions = 5], 60000, state cleanup [ left key predicate: (input[1, struct<start:timestamp,end:timestamp>, false].end <= 60000000), right key predicate: (input[1, struct<start:timestamp,end:timestamp>, false].end <= 60000000) ]
 :- Exchange hashpartitioning(key#726282, window#726297-T10000ms, 5)
 : +- *(2) Project [key#726282, named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#726297-T10000ms, leftValue#726284]
 : +- EventTimeWatermark leftTime#726283: timestamp, interval 10 seconds
 : +- *(1) Project [value#726280 AS key#726282, cast(value#726280 as timestamp) AS leftTime#726283, (value#726280 * 2) AS leftValue#726284]
 : +- *(1) Project [value#726280]
 : +- *(1) MicroBatchScan[value#726280] MemoryStreamDataSource
 +- Exchange hashpartitioning(key#726291, window#726302-T10000ms, 5)
 +- *(4) Project [key#726291, named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#726302-T10000ms, rightValue#726293]
 +- EventTimeWatermark rightTime#726292: timestamp, interval 10 seconds
 +- *(3) Project [value#726289 AS key#726291, cast(value#726289 as timestamp) AS rightTime#726292, (value#726289 * 3) AS rightValue#726293]
 +- *(3) Filter (((value#726289 * 3) < 300) OR ((value#726289 * 3) > 1000))
 +- *(3) Project [value#726289]
 +- *(3) MicroBatchScan[value#726289] MemoryStreamDataSource

 

org.scalatest.exceptions.TestFailedException:
Timed out waiting for stream: The code passed to failAfter did not complete within 60 seconds.
java.base/java.lang.Thread.getStackTrace(Thread.java:1606)
org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:234)
org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
org.apache.spark.sql.streaming.StreamingOuterJoinSuite.failAfterImpl(StreamingJoinSuite.scala:424)
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)
org.apache.spark.sql.streaming.StreamingOuterJoinSuite.failAfter(StreamingJoinSuite.scala:424)
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:463)
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:462)
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
Caused by: null
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2056)
java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2221)
org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:457)
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:464)
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239)
org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
org.apache.spark.sql.streaming.StreamingOuterJoinSuite.failAfterImpl(StreamingJoinSuite.scala:424)
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
== Progress ==
[ AddData to MemoryStream[value#726280]: 1,2,3 | AddData to MemoryStream[value#726289]: 1,2,3 ]
CheckNewAnswer: [1,10,2,null],[2,10,4,null],[3,10,6,null]
AssertOnQuery(<condition>, Check total state rows = List(3), updated state rows = List(3))
[ AddData to MemoryStream[value#726280]: 20 | AddData to MemoryStream[value#726289]: 21 ]
CheckNewAnswer:
AssertOnQuery(<condition>, Check total state rows = List(2), updated state rows = List(2))
AddData to MemoryStream[value#726289]: 20
CheckNewAnswer: [20,30,40,60]
AssertOnQuery(<condition>, Check total state rows = List(3), updated state rows = List(1))
[ AddData to MemoryStream[value#726280]: 40,41 | AddData to MemoryStream[value#726289]: 40,41 ]
CheckNewAnswer: [40,50,80,120],[41,50,82,123]
AssertOnQuery(<condition>, Check total state rows = List(4), updated state rows = List(4))
[ AddData to MemoryStream[value#726280]: 70 | AddData to MemoryStream[value#726289]: 71 ]
CheckNewAnswer:
AssertOnQuery(<condition>, Check total state rows = List(2), updated state rows = List(2))
AddData to MemoryStream[value#726289]: 70
CheckNewAnswer: [70,80,140,210]
AssertOnQuery(<condition>, Check total state rows = List(3), updated state rows = List(1))
[ AddData to MemoryStream[value#726280]: 101,102,103 | AddData to MemoryStream[value#726289]: 101,102,103 ]
=> CheckNewAnswer:
AssertOnQuery(<condition>, Check total state rows = List(6), updated state rows = List(3))
[ AddData to MemoryStream[value#726280]: 1000 | AddData to MemoryStream[value#726289]: 1001 ]
CheckNewAnswer: [101,110,202,null],[102,110,204,null],[103,110,206,null]
AssertOnQuery(<condition>, Check total state rows = List(2), updated state rows = List(2))
== Stream ==
Output Mode: Append
Stream state: {MemoryStream[value#726289]: 5,MemoryStream[value#726280]: 3}
Thread state: alive
Thread stack trace: java.base@11.0.1/jdk.internal.misc.Unsafe.park(Native Method)
java.base@11.0.1/java.util.concurrent.locks.LockSupport.park(LockSupport.java:194)
java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:885)
java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1039)
java.base@11.0.1/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1345)
app//scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
app//scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
app//scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
app//org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:242)
app//org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:736)
app//org.apache.spark.SparkContext.runJob(SparkContext.scala:2029)
app//org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.doWrite(WriteToDataSourceV2Exec.scala:235)
app//org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.doWrite$(WriteToDataSourceV2Exec.scala:217)
app//org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doWrite(WriteToDataSourceV2Exec.scala:180)
app//org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:187)
app//org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
app//org.apache.spark.sql.execution.SparkPlan$$Lambda$5798/0x0000000801853040.apply(Unknown Source)
app//org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
app//org.apache.spark.sql.execution.SparkPlan$$Lambda$5799/0x0000000801853440.apply(Unknown Source)
app//org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
app//org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
app//org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
app//org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:320)
app//org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:369)
app//org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3411)
app//org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2768)
app//org.apache.spark.sql.Dataset$$Lambda$6229/0x00000008019c6040.apply(Unknown Source)
app//org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3401)
app//org.apache.spark.sql.Dataset$$Lambda$5492/0x0000000801729c40.apply(Unknown Source)
app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$5496/0x000000080172c840.apply(Unknown Source)
app//org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
app//org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
app//org.apache.spark.sql.Dataset.withAction(Dataset.scala:3397)
app//org.apache.spark.sql.Dataset.collect(Dataset.scala:2768)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:557)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$10067/0x0000000802821c40.apply(Unknown Source)
app//org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$4(SQLExecution.scala:100)
app//org.apache.spark.sql.execution.SQLExecution$$$Lambda$5496/0x000000080172c840.apply(Unknown Source)
app//org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
app//org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:552)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$10066/0x0000000802821840.apply(Unknown Source)
app//org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:328)
app//org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:326)
app//org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:552)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:213)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$9991/0x00000008027fa840.apply$mcV$sp(Unknown Source)
app//scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
app//org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:328)
app//org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:326)
app//org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:181)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$9985/0x00000008027f8440.apply$mcZ$sp(Unknown Source)
app//org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
app//org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:175)
app//org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332)
app//org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
== Sink ==
0: [2,10,4,null] [3,10,6,null] [1,10,2,null]
1:
2:
3: [20,30,40,60]
4: [41,50,82,123] [40,50,80,120]
5:
6:
7:
8: [70,80,140,210]
== Plan ==
== Parsed Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@46594d79
+- Project [key#726282, cast(window#726297-T10000ms.end as bigint) AS end#726326L, leftValue#726284, rightValue#726293]
+- Join LeftOuter, ((((key#726282 = key#726291) AND (window#726297-T10000ms = window#726302-T10000ms)) AND (leftValue#726284 > 10)) AND ((rightValue#726293 < 300) OR (rightValue#726293 > 1000)))
:- Project [key#726282, window#726298-T10000ms AS window#726297-T10000ms, leftValue#726284]
: +- Filter isnotnull(leftTime#726283-T10000ms)
: +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0) + 10000000), LongType, TimestampType)) AS window#726298-T10000ms, key#726282, leftTime#726283-T10000ms, leftValue#726284]
: +- EventTimeWatermark leftTime#726283: timestamp, interval 10 seconds
: +- Project [value#726280 AS key#726282, cast(value#726280 as timestamp) AS leftTime#726283, (value#726280 * 2) AS leftValue#726284]
: +- StreamingDataSourceV2Relation [value#726280], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@5cf1c91, MemoryStream[value#726280], 3, 4
+- Project [key#726291, window#726303-T10000ms AS window#726302-T10000ms, rightValue#726293]
+- Filter isnotnull(rightTime#726292-T10000ms)
+- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0) + 10000000), LongType, TimestampType)) AS window#726303-T10000ms, key#726291, rightTime#726292-T10000ms, rightValue#726293]
+- EventTimeWatermark rightTime#726292: timestamp, interval 10 seconds
+- Project [value#726289 AS key#726291, cast(value#726289 as timestamp) AS rightTime#726292, (value#726289 * 3) AS rightValue#726293]
+- StreamingDataSourceV2Relation [value#726289], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@29f58ed5, MemoryStream[value#726289], 5, 6
== Analyzed Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@46594d79
+- Project [key#726282, cast(window#726297-T10000ms.end as bigint) AS end#726326L, leftValue#726284, rightValue#726293]
+- Join LeftOuter, ((((key#726282 = key#726291) AND (window#726297-T10000ms = window#726302-T10000ms)) AND (leftValue#726284 > 10)) AND ((rightValue#726293 < 300) OR (rightValue#726293 > 1000)))
:- Project [key#726282, window#726298-T10000ms AS window#726297-T10000ms, leftValue#726284]
: +- Filter isnotnull(leftTime#726283-T10000ms)
: +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0) + 10000000), LongType, TimestampType)) AS window#726298-T10000ms, key#726282, leftTime#726283-T10000ms, leftValue#726284]
: +- EventTimeWatermark leftTime#726283: timestamp, interval 10 seconds
: +- Project [value#726280 AS key#726282, cast(value#726280 as timestamp) AS leftTime#726283, (value#726280 * 2) AS leftValue#726284]
: +- StreamingDataSourceV2Relation [value#726280], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@5cf1c91, MemoryStream[value#726280], 3, 4
+- Project [key#726291, window#726303-T10000ms AS window#726302-T10000ms, rightValue#726293]
+- Filter isnotnull(rightTime#726292-T10000ms)
+- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / cast(10000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 10000000) + 0) + 10000000), LongType, TimestampType)) AS window#726303-T10000ms, key#726291, rightTime#726292-T10000ms, rightValue#726293]
+- EventTimeWatermark rightTime#726292: timestamp, interval 10 seconds
+- Project [value#726289 AS key#726291, cast(value#726289 as timestamp) AS rightTime#726292, (value#726289 * 3) AS rightValue#726293]
+- StreamingDataSourceV2Relation [value#726289], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@29f58ed5, MemoryStream[value#726289], 5, 6
== Optimized Logical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@46594d79
+- Project [key#726282, cast(window#726297-T10000ms.end as bigint) AS end#726326L, leftValue#726284, rightValue#726293]
+- Join LeftOuter, (((leftValue#726284 > 10) AND (key#726282 = key#726291)) AND (window#726297-T10000ms = window#726302-T10000ms))
:- Project [key#726282, named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#726297-T10000ms, leftValue#726284]
: +- EventTimeWatermark leftTime#726283: timestamp, interval 10 seconds
: +- Project [value#726280 AS key#726282, cast(value#726280 as timestamp) AS leftTime#726283, (value#726280 * 2) AS leftValue#726284]
: +- StreamingDataSourceV2Relation [value#726280], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@5cf1c91, MemoryStream[value#726280], 3, 4
+- Project [key#726291, named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#726302-T10000ms, rightValue#726293]
+- EventTimeWatermark rightTime#726292: timestamp, interval 10 seconds
+- Project [value#726289 AS key#726291, cast(value#726289 as timestamp) AS rightTime#726292, (value#726289 * 3) AS rightValue#726293]
+- Filter (((value#726289 * 3) < 300) OR ((value#726289 * 3) > 1000))
+- StreamingDataSourceV2Relation [value#726289], org.apache.spark.sql.execution.streaming.MemoryStreamScanBuilder@29f58ed5, MemoryStream[value#726289], 5, 6
== Physical Plan ==
WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@46594d79
+- *(5) Project [key#726282, cast(window#726297-T10000ms.end as bigint) AS end#726326L, leftValue#726284, rightValue#726293]
+- StreamingSymmetricHashJoin [key#726282, window#726297-T10000ms], [key#726291, window#726302-T10000ms], LeftOuter, condition = [ leftOnly = (leftValue#726284 > 10), rightOnly = null, both = null, full = (leftValue#726284 > 10) ], state info [ checkpoint = file:/home/jenkins/workspace/spark-master-test-maven-hadoop-3.2-jdk-11/sql/core/target/tmp/streaming.metadata-c792c067-8711-4554-8e06-1586240169fe/state, runId = f9b87d2b-6ebb-48d9-b207-76e9cececbfc, opId = 0, ver = 9, numPartitions = 5], 60000, state cleanup [ left key predicate: (input[1, struct<start:timestamp,end:timestamp>, false].end <= 60000000), right key predicate: (input[1, struct<start:timestamp,end:timestamp>, false].end <= 60000000) ]
:- Exchange hashpartitioning(key#726282, window#726297-T10000ms, 5)
: +- *(2) Project [key#726282, named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(leftTime#726283-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#726297-T10000ms, leftValue#726284]
: +- EventTimeWatermark leftTime#726283: timestamp, interval 10 seconds
: +- *(1) Project [value#726280 AS key#726282, cast(value#726280 as timestamp) AS leftTime#726283, (value#726280 * 2) AS leftValue#726284]
: +- *(1) Project [value#726280]
: +- *(1) MicroBatchScan[value#726280] MemoryStreamDataSource
+- Exchange hashpartitioning(key#726291, window#726302-T10000ms, 5)
+- *(4) Project [key#726291, named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(rightTime#726292-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#726302-T10000ms, rightValue#726293]
+- EventTimeWatermark rightTime#726292: timestamp, interval 10 seconds
+- *(3) Project [value#726289 AS key#726291, cast(value#726289 as timestamp) AS rightTime#726292, (value#726289 * 3) AS rightValue#726293]
+- *(3) Filter (((value#726289 * 3) < 300) OR ((value#726289 * 3) > 1000))
+- *(3) Project [value#726289]
+- *(3) MicroBatchScan[value#726289] MemoryStreamDataSource
at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528)
at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:527)
at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
at org.scalatest.Assertions.fail(Assertions.scala:1089)
at org.scalatest.Assertions.fail$(Assertions.scala:1085)
at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
at org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:444)
at org.apache.spark.sql.streaming.StreamTest.liftedTree1$1(StreamTest.scala:780)
at org.apache.spark.sql.streaming.StreamTest.testStream(StreamTest.scala:756)
at org.apache.spark.sql.streaming.StreamTest.testStream$(StreamTest.scala:326)
at org.apache.spark.sql.streaming.StreamingOuterJoinSuite.testStream(StreamingJoinSuite.scala:424)
at org.apache.spark.sql.streaming.StreamingOuterJoinSuite.$anonfun$new$32(StreamingJoinSuite.scala:712)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:149)
at org.scalatest.FunSuiteLike.invokeWithFixture$1(FunSuiteLike.scala:184)
at org.scalatest.FunSuiteLike.$anonfun$runTest$1(FunSuiteLike.scala:196)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
at org.scalatest.FunSuiteLike.runTest(FunSuiteLike.scala:196)
at org.scalatest.FunSuiteLike.runTest$(FunSuiteLike.scala:178)
at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:56)
at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
at org.apache.spark.sql.streaming.StreamingOuterJoinSuite.org$scalatest$BeforeAndAfter$$super$runTest(StreamingJoinSuite.scala:424)
at org.scalatest.BeforeAndAfter.runTest(BeforeAndAfter.scala:203)
at org.scalatest.BeforeAndAfter.runTest$(BeforeAndAfter.scala:192)
at org.apache.spark.sql.streaming.StreamingOuterJoinSuite.runTest(StreamingJoinSuite.scala:424)
at org.scalatest.FunSuiteLike.$anonfun$runTests$1(FunSuiteLike.scala:229)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:396)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:379)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
at org.scalatest.FunSuiteLike.runTests(FunSuiteLike.scala:229)
at org.scalatest.FunSuiteLike.runTests$(FunSuiteLike.scala:228)
at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
at org.scalatest.Suite.run(Suite.scala:1147)
at org.scalatest.Suite.run$(Suite.scala:1129)
at org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
at org.scalatest.FunSuiteLike.$anonfun$run$1(FunSuiteLike.scala:233)
at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
at org.scalatest.FunSuiteLike.run(FunSuiteLike.scala:233)
at org.scalatest.FunSuiteLike.run$(FunSuiteLike.scala:232)
at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:56)
at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
at org.apache.spark.sql.streaming.StreamingOuterJoinSuite.org$scalatest$BeforeAndAfter$$super$run(StreamingJoinSuite.scala:424)
at org.scalatest.BeforeAndAfter.run(BeforeAndAfter.scala:258)
at org.scalatest.BeforeAndAfter.run$(BeforeAndAfter.scala:256)
at org.apache.spark.sql.streaming.StreamingOuterJoinSuite.run(StreamingJoinSuite.scala:424)
at org.scalatest.Suite.callExecuteOnSuite$1(Suite.scala:1210)
at org.scalatest.Suite.$anonfun$runNestedSuites$1(Suite.scala:1257)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.scalatest.Suite.runNestedSuites(Suite.scala:1255)
at org.scalatest.Suite.runNestedSuites$(Suite.scala:1189)
at org.scalatest.tools.DiscoverySuite.runNestedSuites(DiscoverySuite.scala:30)
at org.scalatest.Suite.run(Suite.scala:1144)
at org.scalatest.Suite.run$(Suite.scala:1129)
at org.scalatest.tools.DiscoverySuite.run(DiscoverySuite.scala:30)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1346)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1340)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1340)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:1031)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:1010)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1506)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)
at org.scalatest.tools.Runner$.main(Runner.scala:827)
at org.scalatest.tools.Runner.main(Runner.scala)