org.apache.spark.sql.streaming.StreamingQueryException: Query [id = f41bb5d3-3e68-4dab-8d97-cc94e771c4ad, runId = 274d53d4-0b13-4e22-81c0-797443617a30] terminated with exception: element cannot be mapped to a null key
sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query [id = f41bb5d3-3e68-4dab-8d97-cc94e771c4ad, runId = 274d53d4-0b13-4e22-81c0-797443617a30] terminated with exception: element cannot be mapped to a null key
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: element cannot be mapped to a null key
at java.util.Objects.requireNonNull(Objects.java:228)
at java.util.stream.Collectors.lambda$groupingBy$45(Collectors.java:907)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.kafka.clients.consumer.internals.Fetcher.regroupPartitionMapByNode(Fetcher.java:1120)
at org.apache.kafka.clients.consumer.internals.Fetcher.groupListOffsetRequests(Fetcher.java:887)
at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsAsync(Fetcher.java:684)
at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:454)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2260)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1706)
at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1665)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$fetchLatestOffsets$10(KafkaOffsetReader.scala:289)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:237)
at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:48)
at scala.collection.SetLike.map(SetLike.scala:104)
at scala.collection.SetLike.map$(SetLike.scala:104)
at scala.collection.mutable.AbstractSet.map(Set.scala:48)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$fetchLatestOffsets$2(KafkaOffsetReader.scala:289)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:386)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:385)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$fetchLatestOffsets$1(KafkaOffsetReader.scala:243)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:353)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:243)
at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.latestOffset(KafkaMicroBatchStream.scala:88)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:369)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:328)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:326)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:365)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:237)
at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:357)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:579)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:353)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:198)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:328)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:326)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:181)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:175)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332)
... 1 more