org.apache.spark.sql.streaming.StreamingQueryException: Query [id = f41bb5d3-3e68-4dab-8d97-cc94e771c4ad, runId = 274d53d4-0b13-4e22-81c0-797443617a30] terminated with exception: element cannot be mapped to a null key

sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query [id = f41bb5d3-3e68-4dab-8d97-cc94e771c4ad, runId = 274d53d4-0b13-4e22-81c0-797443617a30] terminated with exception: element cannot be mapped to a null key
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:353)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Caused by: sbt.ForkMain$ForkError: java.lang.NullPointerException: element cannot be mapped to a null key
	at java.util.Objects.requireNonNull(Objects.java:228)
	at java.util.stream.Collectors.lambda$groupingBy$45(Collectors.java:907)
	at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
	at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
	at org.apache.kafka.clients.consumer.internals.Fetcher.regroupPartitionMapByNode(Fetcher.java:1120)
	at org.apache.kafka.clients.consumer.internals.Fetcher.groupListOffsetRequests(Fetcher.java:887)
	at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsAsync(Fetcher.java:684)
	at org.apache.kafka.clients.consumer.internals.Fetcher.resetOffsetsIfNeeded(Fetcher.java:454)
	at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2260)
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1706)
	at org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1665)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$fetchLatestOffsets$10(KafkaOffsetReader.scala:289)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:237)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
	at scala.collection.mutable.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:48)
	at scala.collection.SetLike.map(SetLike.scala:104)
	at scala.collection.SetLike.map$(SetLike.scala:104)
	at scala.collection.mutable.AbstractSet.map(Set.scala:48)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$fetchLatestOffsets$2(KafkaOffsetReader.scala:289)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$withRetriesWithoutInterrupt$1(KafkaOffsetReader.scala:386)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.withRetriesWithoutInterrupt(KafkaOffsetReader.scala:385)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.$anonfun$fetchLatestOffsets$1(KafkaOffsetReader.scala:243)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:353)
	at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:243)
	at org.apache.spark.sql.kafka010.KafkaMicroBatchStream.latestOffset(KafkaMicroBatchStream.scala:88)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$4(MicroBatchExecution.scala:369)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:328)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:326)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$2(MicroBatchExecution.scala:365)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:237)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.map(TraversableLike.scala:237)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:230)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$constructNextBatch$1(MicroBatchExecution.scala:357)
	at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:579)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.constructNextBatch(MicroBatchExecution.scala:353)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:198)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:328)
	at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:326)
	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:181)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:175)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:332)
	... 1 more