package com.qiniu.stream.core.source;

import com.qiniu.stream.core.config.RowTime;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.LongType$;
import org.apache.spark.sql.types.TimestampType$;
import scala.Option;
import scala.Predef$;
import scala.Serializable;
import scala.Some;
import scala.StringContext;
import scala.runtime.AbstractFunction1;

/* compiled from: WaterMarker.scala */
/* loaded from: input_file:com/qiniu/stream/core/source/WaterMarker$$anonfun$withWaterMark$1.class */
public final class WaterMarker$$anonfun$withWaterMark$1 extends AbstractFunction1<RowTime, Dataset<Row>> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ WaterMarker $outer;
    private final Dataset dataFrame$1;

    public final Dataset<Row> apply(RowTime rowTime) {
        Dataset<Row> dataset;
        Dataset<Row> dataset2;
        if (rowTime != null) {
            String fromField = rowTime.fromField();
            String eventTime = rowTime.eventTime();
            String delayThreadsHold = rowTime.delayThreadsHold();
            boolean z = false;
            Some some = null;
            Option map = Predef$.MODULE$.refArrayOps(this.dataFrame$1.schema().fields()).find(new WaterMarker$$anonfun$withWaterMark$1$$anonfun$1(this, fromField)).map(new WaterMarker$$anonfun$withWaterMark$1$$anonfun$2(this));
            if (map instanceof Some) {
                z = true;
                some = (Some) map;
                if (LongType$.MODULE$.equals((DataType) some.x())) {
                    this.$outer.log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"adding watermark. eventTime: ", ", type: Long"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{eventTime})));
                    dataset2 = this.dataFrame$1.withColumn(eventTime, this.dataFrame$1.col(fromField).cast(DataTypes.TimestampType)).withWatermark(eventTime, delayThreadsHold);
                    dataset = dataset2;
                }
            }
            if (z && TimestampType$.MODULE$.equals((DataType) some.x())) {
                this.$outer.log().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"adding watermark. eventTime: ", ", type: Timestamp"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{eventTime})));
                dataset2 = (eventTime != null ? !eventTime.equals(fromField) : fromField != null) ? this.dataFrame$1.withColumn(eventTime, this.dataFrame$1.col(fromField)).withWatermark(eventTime, delayThreadsHold) : this.dataFrame$1.withWatermark(eventTime, delayThreadsHold);
            } else {
                this.$outer.log().warn(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"adding watermark. eventTime: ", ", type: unknown, no watermark will be added"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{eventTime})));
                dataset2 = this.dataFrame$1;
            }
            dataset = dataset2;
        } else {
            this.$outer.log().warn("can't find watermark column, no watermark will be added");
            dataset = this.dataFrame$1;
        }
        return dataset;
    }

    public WaterMarker$$anonfun$withWaterMark$1(WaterMarker waterMarker, Dataset dataset) {
        if (waterMarker == null) {
            throw null;
        }
        this.$outer = waterMarker;
        this.dataFrame$1 = dataset;
    }
}
