package com.qiniu.stream.core.source;

import com.qiniu.stream.core.config.SourceTable;
import com.qiniu.stream.core.listener.KafkaLagListener;
import org.apache.spark.sql.SparkSession;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* compiled from: StreamReader.scala */
/* loaded from: input_file:com/qiniu/stream/core/source/StreamReader$$anonfun$enableKafkaLagListener$1.class */
public final class StreamReader$$anonfun$enableKafkaLagListener$1 extends AbstractFunction1<String, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ StreamReader $outer;
    private final SparkSession sparkSession$1;
    private final SourceTable sourceTable$1;

    public final void apply(String str) {
        Option<String> option = this.sourceTable$1.connector().option("kafka.bootstrap.servers");
        Predef$.MODULE$.require(option.isDefined());
        this.$outer.log().info("register streaming query listener for kafka streaming");
        this.sparkSession$1.streams().addListener(new KafkaLagListener(str, (String) option.get()));
    }

    public final /* bridge */ /* synthetic */ Object apply(Object obj) {
        apply((String) obj);
        return BoxedUnit.UNIT;
    }

    public StreamReader$$anonfun$enableKafkaLagListener$1(StreamReader streamReader, SparkSession sparkSession, SourceTable sourceTable) {
        if (streamReader == null) {
            throw null;
        }
        this.$outer = streamReader;
        this.sparkSession$1 = sparkSession;
        this.sourceTable$1 = sourceTable;
    }
}
