package com.qiniu.stream.core.source;

import com.qiniu.stream.core.config.RowFormat;
import com.qiniu.stream.core.config.RowTime;
import com.qiniu.stream.core.config.Schema;
import com.qiniu.stream.core.config.Schema$;
import com.qiniu.stream.core.config.SourceTable;
import com.qiniu.stream.core.config.TimeField;
import com.qiniu.stream.core.source.WaterMarker;
import com.qiniu.stream.util.Logging;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.avro.package$;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function0;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Seq;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;

/* compiled from: StreamReader.scala */
@ScalaSignature(bytes = "\u0006\u0001U4A!\u0001\u0002\u0001\u001b\ta1\u000b\u001e:fC6\u0014V-\u00193fe*\u00111\u0001B\u0001\u0007g>,(oY3\u000b\u0005\u00151\u0011\u0001B2pe\u0016T!a\u0002\u0005\u0002\rM$(/Z1n\u0015\tI!\"A\u0003rS:LWOC\u0001\f\u0003\r\u0019w.\\\u0002\u0001'\u0015\u0001a\u0002\u0006\r\u001c!\ty!#D\u0001\u0011\u0015\u0005\t\u0012!B:dC2\f\u0017BA\n\u0011\u0005\u0019\te.\u001f*fMB\u0011QCF\u0007\u0002\u0005%\u0011qC\u0001\u0002\u0007%\u0016\fG-\u001a:\u0011\u0005UI\u0012B\u0001\u000e\u0003\u0005-9\u0016\r^3s\u001b\u0006\u00148.\u001a:\u0011\u0005qyR\"A\u000f\u000b\u0005y1\u0011\u0001B;uS2L!\u0001I\u000f\u0003\u000f1{wmZ5oO\")!\u0005\u0001C\u0001G\u00051A(\u001b8jiz\"\u0012\u0001\n\t\u0003+\u0001AQA\n\u0001\u0005\n\u001d\n\u0011B[:p]R\u000b'\r\\3\u0015\u0007!Bt\t\u0005\u0002*k9\u0011!fM\u0007\u0002W)\u0011A&L\u0001\u0004gFd'B\u0001\u00180\u0003\u0015\u0019\b/\u0019:l\u0015\t\u0001\u0014'\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002e\u0005\u0019qN]4\n\u0005QZ\u0013a\u00029bG.\fw-Z\u0005\u0003m]\u0012\u0011\u0002R1uC\u001a\u0013\u0018-\\3\u000b\u0005QZ\u0003\"B\u001d&\u0001\u0004Q\u0014!\u0002;bE2,\u0007CA\u001e6\u001d\ta4G\u0004\u0002>\r:\u0011a(\u0012\b\u0003\u007f\u0011s!\u0001Q\"\u000e\u0003\u0005S!A\u0011\u0007\u0002\rq\u0012xn\u001c;?\u0013\u0005\u0011\u0014B\u0001\u00192\u0013\tqs&\u0003\u0002-[!)\u0001*\na\u0001\u0013\u0006Y1o\\;sG\u0016$\u0016M\u00197f!\tQU*D\u0001L\u0015\taE!\u0001\u0004d_:4\u0017nZ\u0005\u0003\u001d.\u00131bU8ve\u000e,G+\u00192mK\")\u0001\u000b\u0001C\u0005#\u0006I\u0011M\u001e:p)\u0006\u0014G.\u001a\u000b\u0004QI;\u0006\"B*P\u0001\u0004!\u0016AC1we>4uN]7biB\u0011!*V\u0005\u0003-.\u0013\u0011BU8x\r>\u0014X.\u0019;\t\u000bez\u0005\u0019\u0001\u001e\t\u000be\u0003A\u0011\u0002.\u0002\u0011\r\u001ch\u000fV1cY\u0016$2\u0001K.^\u0011\u0015a\u0006\f1\u0001U\u0003%\u00197O\u001e$pe6\fG\u000fC\u0003:1\u0002\u0007!\bC\u0003`\u0001\u0011\u0005\u0003-\u0001\u0003sK\u0006$Gc\u0001\u001ebM\")!M\u0018a\u0001G\u0006a1\u000f]1sWN+7o]5p]B\u0011!\u0006Z\u0005\u0003K.\u0012Ab\u00159be.\u001cVm]:j_:DQ\u0001\u00130A\u0002%CQ\u0001\u001b\u0001\u0005\n%\f!B]3hKb$\u0016M\u00197f)\u0011A#n\u001b7\t\u000b\t<\u0007\u0019A2\t\u000be:\u0007\u0019\u0001\u001e\t\u000b!;\u0007\u0019A%\t\u000b9\u0004A\u0011B8\u0002-\u0015t\u0017M\u00197f\u0017\u000647.\u0019'bO2K7\u000f^3oKJ$2\u0001]:u!\ty\u0011/\u0003\u0002s!\t!QK\\5u\u0011\u0015\u0011W\u000e1\u0001d\u0011\u0015AU\u000e1\u0001J\u0001")
/* loaded from: input_file:com/qiniu/stream/core/source/StreamReader.class */
public class StreamReader implements Reader, WaterMarker {
    private transient Logger log;

    @Override // com.qiniu.stream.core.source.WaterMarker
    public Dataset<Row> withWaterMark(Dataset<Row> dataset, Option<RowTime> option) {
        return WaterMarker.Cclass.withWaterMark(this, dataset, option);
    }

    public Logger log() {
        return this.log;
    }

    public void log_$eq(Logger logger) {
        this.log = logger;
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    private Dataset<Row> jsonTable(Dataset<Row> dataset, SourceTable sourceTable) {
        return dataset.withColumn("value", dataset.col("value").cast(DataTypes.StringType)).withColumn("value", functions$.MODULE$.from_json(functions$.MODULE$.col("value"), Schema$.MODULE$.RichSchema((Schema) sourceTable.schema().get()).structType())).select("value.*", Predef$.MODULE$.wrapRefArray((String[]) Predef$.MODULE$.refArrayOps(dataset.schema().fieldNames()).filterNot(new StreamReader$$anonfun$1(this))));
    }

    private Dataset<Row> avroTable(RowFormat rowFormat, Dataset<Row> dataset) {
        String[] strArr = (String[]) Predef$.MODULE$.refArrayOps(dataset.schema().fieldNames()).filterNot(new StreamReader$$anonfun$2(this));
        Predef$.MODULE$.require(rowFormat.props().contains("jsonSchema"), new StreamReader$$anonfun$avroTable$1(this));
        return dataset.withColumn("value", package$.MODULE$.from_avro(dataset.col("value"), (String) rowFormat.props().apply("jsonSchema"))).select("value.*", Predef$.MODULE$.wrapRefArray(strArr));
    }

    private Dataset<Row> csvTable(RowFormat rowFormat, Dataset<Row> dataset) {
        return dataset.withColumn("value", dataset.col("value").cast(DataTypes.StringType));
    }

    @Override // com.qiniu.stream.core.source.Reader
    public Dataset<Row> read(SparkSession sparkSession, SourceTable sourceTable) {
        Dataset<Row> dataset;
        Predef$.MODULE$.require(sourceTable.schema().isDefined(), new StreamReader$$anonfun$read$1(this));
        Dataset<Row> load = sparkSession.readStream().format(sourceTable.connector().name()).options(sourceTable.connector().options()).load();
        enableKafkaLagListener(sparkSession, sourceTable);
        RowFormat format = sourceTable.format();
        Dataset<Row> jsonTable = format.isJsonFormat() ? jsonTable(load, sourceTable) : format.isAvroFormat() ? avroTable(format, load) : format.isCsvFormat() ? csvTable(format, load) : format.isRegExFormat() ? regexTable(sparkSession, load, sourceTable) : format.isTextFormat() ? load : jsonTable(load, sourceTable);
        Some timeField = ((Schema) sourceTable.schema().get()).timeField();
        if (timeField instanceof Some) {
            TimeField timeField2 = (TimeField) timeField.x();
            if (timeField2 instanceof RowTime) {
                dataset = withWaterMark(jsonTable, new Some((RowTime) timeField2));
                return dataset;
            }
        }
        dataset = jsonTable;
        return dataset;
    }

    private Dataset<Row> regexTable(SparkSession sparkSession, Dataset<Row> dataset, SourceTable sourceTable) {
        Option option = sourceTable.format().props().get("pattern");
        Predef$.MODULE$.require(option.isDefined(), new StreamReader$$anonfun$regexTable$1(this));
        String ddl = Schema$.MODULE$.RichSchema((Schema) sourceTable.schema().get()).toDDL();
        StructType structType = Schema$.MODULE$.RichSchema((Schema) sourceTable.schema().get()).structType();
        String str = (String) option.get();
        sparkSession.udf().register("regex2Json", functions$.MODULE$.udf(new StreamReader$$anonfun$regexTable$2(this, ddl, str), scala.reflect.runtime.package$.MODULE$.universe().TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(StreamReader.class.getClassLoader()), new TypeCreator(this) { // from class: com.qiniu.stream.core.source.StreamReader$$typecreator1$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        }), scala.reflect.runtime.package$.MODULE$.universe().TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(StreamReader.class.getClassLoader()), new TypeCreator(this) { // from class: com.qiniu.stream.core.source.StreamReader$$typecreator2$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe = mirror.universe();
                return universe.internal().reificationSupport().TypeRef(universe.internal().reificationSupport().SingleType(universe.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticModule("scala.Predef")), universe.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        })));
        return dataset.withColumn("kafkaValue", functions$.MODULE$.struct((Seq) List$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new String[]{"key", "partition", "offset", "timestamp", "timestampType", "topic"})).map(new StreamReader$$anonfun$regexTable$3(this), List$.MODULE$.canBuildFrom()))).selectExpr(Predef$.MODULE$.wrapRefArray(new String[]{"regex2json(CAST(value AS STRING)) as jsonValue", "kafkaValue"})).withColumn("value", functions$.MODULE$.from_json(functions$.MODULE$.col("jsonValue"), structType)).select("value.*", Predef$.MODULE$.wrapRefArray(new String[]{"kafkaValue"}));
    }

    private void enableKafkaLagListener(SparkSession sparkSession, SourceTable sourceTable) {
        sourceTable.connector().option("group_id").foreach(new StreamReader$$anonfun$enableKafkaLagListener$1(this, sparkSession, sourceTable));
    }

    public StreamReader() {
        Logging.class.$init$(this);
        WaterMarker.Cclass.$init$(this);
    }
}
