package com.qiniu.stream.core.sink;

import com.qiniu.stream.core.config.Connector;
import com.qiniu.stream.core.config.SinkTable;
import com.qiniu.stream.util.Logging;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.slf4j.Logger;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.collection.Seq;
import scala.reflect.ScalaSignature;

/* compiled from: BatchWriter.scala */
@ScalaSignature(bytes = "\u0006\u0001M4A!\u0001\u0002\u0001\u001b\tY!)\u0019;dQ^\u0013\u0018\u000e^3s\u0015\t\u0019A!\u0001\u0003tS:\\'BA\u0003\u0007\u0003\u0011\u0019wN]3\u000b\u0005\u001dA\u0011AB:ue\u0016\fWN\u0003\u0002\n\u0015\u0005)\u0011/\u001b8jk*\t1\"A\u0002d_6\u001c\u0001a\u0005\u0003\u0001\u001dQA\u0002CA\b\u0013\u001b\u0005\u0001\"\"A\t\u0002\u000bM\u001c\u0017\r\\1\n\u0005M\u0001\"AB!osJ+g\r\u0005\u0002\u0016-5\t!!\u0003\u0002\u0018\u0005\t1qK]5uKJ\u0004\"!\u0007\u000f\u000e\u0003iQ!a\u0007\u0004\u0002\tU$\u0018\u000e\\\u0005\u0003;i\u0011q\u0001T8hO&tw\rC\u0003 \u0001\u0011\u0005\u0001%\u0001\u0004=S:LGO\u0010\u000b\u0002CA\u0011Q\u0003\u0001\u0005\u0006G\u0001!\t\u0005J\u0001\u0006oJLG/\u001a\u000b\u0004K!\"\u0005CA\b'\u0013\t9\u0003C\u0001\u0003V]&$\b\"B\u0015#\u0001\u0004Q\u0013!\u00033bi\u00064%/Y7f!\tY\u0013I\u0004\u0002-}9\u0011Qf\u000f\b\u0003]ar!aL\u001b\u000f\u0005A\u001aT\"A\u0019\u000b\u0005Ib\u0011A\u0002\u001fs_>$h(C\u00015\u0003\ry'oZ\u0005\u0003m]\na!\u00199bG\",'\"\u0001\u001b\n\u0005eR\u0014!B:qCJ\\'B\u0001\u001c8\u0013\taT(A\u0002tc2T!!\u000f\u001e\n\u0005}\u0002\u0015a\u00029bG.\fw-\u001a\u0006\u0003yuJ!AQ\"\u0003\u0013\u0011\u000bG/\u0019$sC6,'BA A\u0011\u0015)%\u00051\u0001G\u0003%\u0019\u0018N\\6UC\ndW\r\u0005\u0002H\u00156\t\u0001J\u0003\u0002J\t\u000511m\u001c8gS\u001eL!a\u0013%\u0003\u0013MKgn\u001b+bE2,\u0007\"B\u0012\u0001\t\u0003iE\u0003B\u0013O\u001f^CQ!\u000b'A\u0002)BQ\u0001\u0015'A\u0002E\u000bqAY1uG\"LE\rE\u0002\u0010%RK!a\u0015\t\u0003\r=\u0003H/[8o!\tyQ+\u0003\u0002W!\t!Aj\u001c8h\u0011\u0015)E\n1\u0001G\u0011\u0015I\u0006\u0001\"\u0003[\u0003I\u0019H/\u0019:u\u001bVdG/\u001b9mK\n\u000bGo\u00195\u0015\u000b\u0015ZF,\u001b6\t\u000b%B\u0006\u0019\u0001\u0016\t\u000buC\u0006\u0019\u00010\u0002\u0015\r|gN\\3di>\u00148\u000fE\u0002`G\u001at!\u0001\u00192\u000f\u0005A\n\u0017\"A\t\n\u0005}\u0002\u0012B\u00013f\u0005\r\u0019V-\u001d\u0006\u0003\u007fA\u0001\"aR4\n\u0005!D%!C\"p]:,7\r^8s\u0011\u0015\u0001\u0006\f1\u0001R\u0011\u0015)\u0005\f1\u0001G\u0011\u0015a\u0007\u0001\"\u0003n\u00035\u0019H/\u0019:u\u001f:,')\u0019;dQR)QE\\8re\")\u0011f\u001ba\u0001U!)\u0001o\u001ba\u0001M\u0006I1m\u001c8oK\u000e$xN\u001d\u0005\u0006!.\u0004\r!\u0015\u0005\u0006\u000b.\u0004\rA\u0012")
/* loaded from: input_file:com/qiniu/stream/core/sink/BatchWriter.class */
public class BatchWriter implements Writer, Logging {
    private transient Logger log;

    public Logger log() {
        return this.log;
    }

    public void log_$eq(Logger logger) {
        this.log = logger;
    }

    public void logInfo(Function0<String> function0) {
        Logging.class.logInfo(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.class.logDebug(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.class.logTrace(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.class.logWarning(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.class.logError(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.class.logInfo(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.class.logDebug(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.class.logTrace(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.class.logWarning(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.class.logError(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.class.isTraceEnabled(this);
    }

    @Override // com.qiniu.stream.core.sink.Writer
    public void write(Dataset<Row> dataset, SinkTable sinkTable) {
        write(dataset, None$.MODULE$, sinkTable);
    }

    public void write(Dataset<Row> dataset, Option<Object> option, SinkTable sinkTable) {
        if (sinkTable.connectors().size() == 1) {
            com$qiniu$stream$core$sink$BatchWriter$$startOneBatch(dataset, (Connector) sinkTable.connectors().head(), option, sinkTable);
        } else {
            startMultipleBatch(dataset, sinkTable.connectors(), option, sinkTable);
        }
    }

    private void startMultipleBatch(Dataset<Row> dataset, Seq<Connector> seq, Option<Object> option, SinkTable sinkTable) {
        dataset.persist();
        seq.foreach(new BatchWriter$$anonfun$startMultipleBatch$1(this, dataset, option, sinkTable));
        dataset.unpersist();
    }

    public void com$qiniu$stream$core$sink$BatchWriter$$startOneBatch(Dataset<Row> dataset, Connector connector, Option<Object> option, SinkTable sinkTable) {
        logInfo(new BatchWriter$$anonfun$com$qiniu$stream$core$sink$BatchWriter$$startOneBatch$1(this, connector));
        DataFrameWriter options = dataset.write().format(connector.name()).options(connector.options());
        sinkTable.updateMode().orElse(new BatchWriter$$anonfun$com$qiniu$stream$core$sink$BatchWriter$$startOneBatch$2(this, connector)).foreach(new BatchWriter$$anonfun$com$qiniu$stream$core$sink$BatchWriter$$startOneBatch$3(this, options));
        sinkTable.partitions().orElse(new BatchWriter$$anonfun$com$qiniu$stream$core$sink$BatchWriter$$startOneBatch$4(this, connector)).foreach(new BatchWriter$$anonfun$com$qiniu$stream$core$sink$BatchWriter$$startOneBatch$5(this, options));
        sinkTable.bucket().orElse(new BatchWriter$$anonfun$com$qiniu$stream$core$sink$BatchWriter$$startOneBatch$6(this, connector)).foreach(new BatchWriter$$anonfun$com$qiniu$stream$core$sink$BatchWriter$$startOneBatch$7(this, options));
        options.save();
    }

    public BatchWriter() {
        Logging.class.$init$(this);
    }
}
