package com.qiniu.stream.core.sink;

import com.qiniu.stream.core.config.Connector;
import com.qiniu.stream.core.config.SinkTable;
import com.qiniu.stream.core.config.SinkTable$;
import com.qiniu.stream.core.util.StreamOptions$;
import com.qiniu.stream.util.Logging;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.Trigger;
import org.slf4j.Logger;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.reflect.ScalaSignature;
import scala.runtime.ObjectRef;

/* compiled from: StreamWriter.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ua\u0001B\u0001\u0003\u00015\u0011Ab\u0015;sK\u0006lwK]5uKJT!a\u0001\u0003\u0002\tMLgn\u001b\u0006\u0003\u000b\u0019\tAaY8sK*\u0011q\u0001C\u0001\u0007gR\u0014X-Y7\u000b\u0005%Q\u0011!B9j]&,(\"A\u0006\u0002\u0007\r|Wn\u0001\u0001\u0014\t\u0001qA\u0003\u0007\t\u0003\u001fIi\u0011\u0001\u0005\u0006\u0002#\u0005)1oY1mC&\u00111\u0003\u0005\u0002\u0007\u0003:L(+\u001a4\u0011\u0005U1R\"\u0001\u0002\n\u0005]\u0011!AB,sSR,'\u000f\u0005\u0002\u001a95\t!D\u0003\u0002\u001c\r\u0005!Q\u000f^5m\u0013\ti\"DA\u0004M_\u001e<\u0017N\\4\t\u000b}\u0001A\u0011\u0001\u0011\u0002\rqJg.\u001b;?)\u0005\t\u0003CA\u000b\u0001\u0011\u0015\u0019\u0003\u0001\"\u0011%\u0003\u00159(/\u001b;f)\r)\u0003\u0006\u0012\t\u0003\u001f\u0019J!a\n\t\u0003\tUs\u0017\u000e\u001e\u0005\u0006S\t\u0002\rAK\u0001\nI\u0006$\u0018M\u0012:b[\u0016\u0004\"aK!\u000f\u00051rdBA\u0017<\u001d\tq\u0003H\u0004\u00020k9\u0011\u0001gM\u0007\u0002c)\u0011!\u0007D\u0001\u0007yI|w\u000e\u001e \n\u0003Q\n1a\u001c:h\u0013\t1t'\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002i%\u0011\u0011HO\u0001\u0006gB\f'o\u001b\u0006\u0003m]J!\u0001P\u001f\u0002\u0007M\fHN\u0003\u0002:u%\u0011q\bQ\u0001\ba\u0006\u001c7.Y4f\u0015\taT(\u0003\u0002C\u0007\nIA)\u0019;b\rJ\fW.\u001a\u0006\u0003\u007f\u0001CQ!\u0012\u0012A\u0002\u0019\u000b\u0011b]5oWR\u000b'\r\\3\u0011\u0005\u001dSU\"\u0001%\u000b\u0005%#\u0011AB2p]\u001aLw-\u0003\u0002L\u0011\nI1+\u001b8l)\u0006\u0014G.\u001a\u0005\u0006\u001b\u0002!IAT\u0001\u0019gR\f'\u000f^'vYRL\u0007\u000f\\3TiJ,\u0017-\\)vKJLH\u0003B(V-\u000e\u0004\"\u0001U*\u000e\u0003ES!A\u0015!\u0002\u0013M$(/Z1nS:<\u0017B\u0001+R\u00059\u0019FO]3b[&tw-U;fefDQ!\u000b'A\u0002)BQa\u0016'A\u0002a\u000b!bY8o]\u0016\u001cGo\u001c:t!\rIV\f\u0019\b\u00035rs!\u0001M.\n\u0003EI!a\u0010\t\n\u0005y{&aA*fc*\u0011q\b\u0005\t\u0003\u000f\u0006L!A\u0019%\u0003\u0013\r{gN\\3di>\u0014\b\"B#M\u0001\u00041\u0005\"B3\u0001\t\u00131\u0017AC<sSR,')\u0019;dQR)QeZ8rs\")\u0011\u0006\u001aa\u0001QB\u0019\u0011N\u001b7\u000e\u0003\u0001K!a\u001b!\u0003\u000f\u0011\u000bG/Y:fiB\u0011\u0011.\\\u0005\u0003]\u0002\u00131AU8x\u0011\u0015\u0001H\r1\u0001a\u0003%\u0019wN\u001c8fGR|'\u000fC\u0003sI\u0002\u00071/A\u0004cCR\u001c\u0007.\u00133\u0011\u0007=!h/\u0003\u0002v!\t1q\n\u001d;j_:\u0004\"aD<\n\u0005a\u0004\"\u0001\u0002'p]\u001eDQ!\u00123A\u0002\u0019CQa\u001f\u0001\u0005\nq\f1c\u001d;beR|e.Z*ue\u0016\fW.U;fef$BaT?\u007f\u007f\")\u0011F\u001fa\u0001U!)\u0001O\u001fa\u0001A\")QI\u001fa\u0001\r\"9\u00111\u0001\u0001\u0005\n\u0005\u0015\u0011a\u00048foN#(/Z1n/JLG/\u001a:\u0015\r\u0005\u001d\u0011QBA\b!\u0011\u0001\u0016\u0011\u00027\n\u0007\u0005-\u0011K\u0001\tECR\f7\u000b\u001e:fC6<&/\u001b;fe\"1\u0011&!\u0001A\u0002)Ba!RA\u0001\u0001\u00041\u0005bBA\n\u0001\u0011%\u0011QC\u0001\u0011o&$\bn\u0015;sK\u0006lw\n\u001d;j_:$R!JA\f\u00037A\u0001\"!\u0007\u0002\u0012\u0001\u0007\u0011qA\u0001\u0007oJLG/\u001a:\t\r\u0015\u000b\t\u00021\u0001G\u0001")
/* loaded from: input_file:com/qiniu/stream/core/sink/StreamWriter.class */
public class StreamWriter implements Writer, Logging {
    private transient Logger log;

    public Logger log() {
        return this.log;
    }

    public void log_$eq(Logger logger) {
        this.log = logger;
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    @Override // com.qiniu.stream.core.sink.Writer
    public void write(Dataset<Row> dataset, SinkTable sinkTable) {
        if (sinkTable.connectors().size() == 1) {
            startOneStreamQuery(dataset, (Connector) sinkTable.connectors().head(), sinkTable);
        } else {
            startMultipleStreamQuery(dataset, sinkTable.connectors(), sinkTable);
        }
    }

    private StreamingQuery startMultipleStreamQuery(Dataset<Row> dataset, Seq<Connector> seq, SinkTable sinkTable) {
        Predef$.MODULE$.require(!seq.exists(new StreamWriter$$anonfun$startMultipleStreamQuery$2(this)), new StreamWriter$$anonfun$startMultipleStreamQuery$1(this));
        DataStreamWriter<Row> newStreamWriter = newStreamWriter(dataset, sinkTable);
        newStreamWriter.foreachBatch(new StreamWriter$$anonfun$startMultipleStreamQuery$3(this, seq, sinkTable));
        return newStreamWriter.start();
    }

    public void com$qiniu$stream$core$sink$StreamWriter$$writeBatch(Dataset<Row> dataset, Connector connector, Option<Object> option, SinkTable sinkTable) {
        ObjectRef create = ObjectRef.create((Dataset) connector.conditionExpr().map(new StreamWriter$$anonfun$3(this, dataset)).getOrElse(new StreamWriter$$anonfun$4(this, dataset)));
        create.elem = (Dataset) connector.includeColumns().map(new StreamWriter$$anonfun$com$qiniu$stream$core$sink$StreamWriter$$writeBatch$1(this, create)).getOrElse(new StreamWriter$$anonfun$com$qiniu$stream$core$sink$StreamWriter$$writeBatch$2(this, create));
        create.elem = (Dataset) connector.excludeColumns().map(new StreamWriter$$anonfun$com$qiniu$stream$core$sink$StreamWriter$$writeBatch$3(this, create)).getOrElse(new StreamWriter$$anonfun$com$qiniu$stream$core$sink$StreamWriter$$writeBatch$4(this, create));
        if (connector.isConsole()) {
            ((Dataset) create.elem).show();
        } else {
            new BatchWriter().write((Dataset) create.elem, option, new SinkTable(false, sinkTable.name(), sinkTable.schema(), sinkTable.format(), Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Connector[]{connector})), SinkTable$.MODULE$.apply$default$6(), SinkTable$.MODULE$.apply$default$7(), SinkTable$.MODULE$.apply$default$8()));
        }
    }

    private StreamingQuery startOneStreamQuery(Dataset<Row> dataset, Connector connector, SinkTable sinkTable) {
        DataStreamWriter<Row> newStreamWriter = newStreamWriter(dataset, sinkTable);
        if (sinkTable.option(StreamOptions$.MODULE$.batchWrite()).exists(new StreamWriter$$anonfun$5(this))) {
            newStreamWriter.foreachBatch(new StreamWriter$$anonfun$startOneStreamQuery$1(this, connector, sinkTable));
        } else {
            newStreamWriter.format(connector.name()).options(connector.options());
        }
        return newStreamWriter.start();
    }

    private DataStreamWriter<Row> newStreamWriter(Dataset<Row> dataset, SinkTable sinkTable) {
        DataStreamWriter<Row> writeStream = dataset.writeStream();
        sinkTable.updateMode().foreach(new StreamWriter$$anonfun$newStreamWriter$1(this, writeStream));
        sinkTable.partitions().foreach(new StreamWriter$$anonfun$newStreamWriter$2(this, writeStream));
        withStreamOption(writeStream, sinkTable);
        return writeStream;
    }

    private void withStreamOption(DataStreamWriter<Row> dataStreamWriter, SinkTable sinkTable) {
        Some some;
        sinkTable.option(StreamOptions$.MODULE$.queryName()).foreach(new StreamWriter$$anonfun$withStreamOption$1(this, dataStreamWriter));
        sinkTable.option(StreamOptions$.MODULE$.checkpointLocation()).foreach(new StreamWriter$$anonfun$withStreamOption$2(this, dataStreamWriter));
        Tuple2 tuple2 = new Tuple2(sinkTable.option(StreamOptions$.MODULE$.triggerMode()), sinkTable.option(StreamOptions$.MODULE$.triggerInterval()));
        if (tuple2 != null) {
            Some some2 = (Option) tuple2._1();
            Some some3 = (Option) tuple2._2();
            if (some2 instanceof Some) {
                String str = (String) some2.x();
                String triggerModeProcessingTime = StreamOptions$.MODULE$.triggerModeProcessingTime();
                if (triggerModeProcessingTime != null ? triggerModeProcessingTime.equals(str) : str == null) {
                    if (some3 instanceof Some) {
                        some = new Some(Trigger.ProcessingTime((String) some3.x()));
                        some.foreach(new StreamWriter$$anonfun$withStreamOption$3(this, dataStreamWriter));
                    }
                }
            }
        }
        if (tuple2 != null) {
            Some some4 = (Option) tuple2._1();
            Some some5 = (Option) tuple2._2();
            if (some4 instanceof Some) {
                String str2 = (String) some4.x();
                String triggerModeContinuous = StreamOptions$.MODULE$.triggerModeContinuous();
                if (triggerModeContinuous != null ? triggerModeContinuous.equals(str2) : str2 == null) {
                    if (some5 instanceof Some) {
                        some = new Some(Trigger.Continuous((String) some5.x()));
                        some.foreach(new StreamWriter$$anonfun$withStreamOption$3(this, dataStreamWriter));
                    }
                }
            }
        }
        if (tuple2 != null) {
            Some some6 = (Option) tuple2._1();
            Option option = (Option) tuple2._2();
            if (some6 instanceof Some) {
                String str3 = (String) some6.x();
                String triggerModeOnce = StreamOptions$.MODULE$.triggerModeOnce();
                if (triggerModeOnce != null ? triggerModeOnce.equals(str3) : str3 == null) {
                    if (None$.MODULE$.equals(option)) {
                        some = new Some(Trigger.Once());
                        some.foreach(new StreamWriter$$anonfun$withStreamOption$3(this, dataStreamWriter));
                    }
                }
            }
        }
        some = None$.MODULE$;
        some.foreach(new StreamWriter$$anonfun$withStreamOption$3(this, dataStreamWriter));
    }

    public StreamWriter() {
        Logging.class.$init$(this);
    }
}
