package com.agoda.kafka.connector.jdbc;

import com.agoda.kafka.connector.jdbc.models.Mode;
import com.agoda.kafka.connector.jdbc.models.Mode$IncrementingMode$;
import com.agoda.kafka.connector.jdbc.models.Mode$TimestampIncrementingMode$;
import com.agoda.kafka.connector.jdbc.models.Mode$TimestampMode$;
import com.agoda.kafka.connector.jdbc.services.DataService;
import com.agoda.kafka.connector.jdbc.services.IdBasedDataService;
import com.agoda.kafka.connector.jdbc.services.TimeBasedDataService;
import com.agoda.kafka.connector.jdbc.services.TimeIdBasedDataService;
import com.agoda.kafka.connector.jdbc.utils.Version$;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.MatchError;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.StringOps;
import scala.concurrent.duration.package;
import scala.concurrent.duration.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try$;

/* compiled from: JdbcSourceTask.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005=b\u0001B\u0001\u0003\u00015\u0011aB\u00133cGN{WO]2f)\u0006\u001c8N\u0003\u0002\u0004\t\u0005!!\u000e\u001a2d\u0015\t)a!A\u0005d_:tWm\u0019;pe*\u0011q\u0001C\u0001\u0006W\u000647.\u0019\u0006\u0003\u0013)\tQ!Y4pI\u0006T\u0011aC\u0001\u0004G>l7\u0001A\n\u0003\u00019\u0001\"aD\r\u000e\u0003AQ!!\u0005\n\u0002\rM|WO]2f\u0015\t\u0019B#A\u0004d_:tWm\u0019;\u000b\u0005\u001d)\"B\u0001\f\u0018\u0003\u0019\t\u0007/Y2iK*\t\u0001$A\u0002pe\u001eL!A\u0007\t\u0003\u0015M{WO]2f)\u0006\u001c8\u000eC\u0003\u001d\u0001\u0011\u0005Q$\u0001\u0004=S:LGO\u0010\u000b\u0002=A\u0011q\u0004A\u0007\u0002\u0005!9\u0011\u0005\u0001b\u0001\n\u0013\u0011\u0013A\u00027pO\u001e,'/F\u0001$!\t!s%D\u0001&\u0015\t1s#A\u0003tY\u001a$$.\u0003\u0002)K\t1Aj\\4hKJDaA\u000b\u0001!\u0002\u0013\u0019\u0013a\u00027pO\u001e,'\u000f\t\u0005\nY\u0001\u0001\r\u00111A\u0005\n5\naaY8oM&<W#\u0001\u0018\u0011\u0005}y\u0013B\u0001\u0019\u0003\u0005QQEMY2T_V\u00148-\u001a+bg.\u001cuN\u001c4jO\"I!\u0007\u0001a\u0001\u0002\u0004%IaM\u0001\u000bG>tg-[4`I\u0015\fHC\u0001\u001b;!\t)\u0004(D\u00017\u0015\u00059\u0014!B:dC2\f\u0017BA\u001d7\u0005\u0011)f.\u001b;\t\u000fm\n\u0014\u0011!a\u0001]\u0005\u0019\u0001\u0010J\u0019\t\ru\u0002\u0001\u0015)\u0003/\u0003\u001d\u0019wN\u001c4jO\u0002B\u0011b\u0010\u0001A\u0002\u0003\u0007I\u0011\u0002!\u0002\u0005\u0011\u0014W#A!\u0011\u0005\t;U\"A\"\u000b\u0005\u0011+\u0015aA:rY*\ta)\u0001\u0003kCZ\f\u0017B\u0001%D\u0005)\u0019uN\u001c8fGRLwN\u001c\u0005\n\u0015\u0002\u0001\r\u00111A\u0005\n-\u000ba\u0001\u001a2`I\u0015\fHC\u0001\u001bM\u0011\u001dY\u0014*!AA\u0002\u0005CaA\u0014\u0001!B\u0013\t\u0015a\u00013cA!I\u0001\u000b\u0001a\u0001\u0002\u0004%I!U\u0001\fI\u0006$\u0018mU3sm&\u001cW-F\u0001S!\t\u0019f+D\u0001U\u0015\t)&!\u0001\u0005tKJ4\u0018nY3t\u0013\t9FKA\u0006ECR\f7+\u001a:wS\u000e,\u0007\"C-\u0001\u0001\u0004\u0005\r\u0011\"\u0003[\u0003=!\u0017\r^1TKJ4\u0018nY3`I\u0015\fHC\u0001\u001b\\\u0011\u001dY\u0004,!AA\u0002ICa!\u0018\u0001!B\u0013\u0011\u0016\u0001\u00043bi\u0006\u001cVM\u001d<jG\u0016\u0004\u0003\"C0\u0001\u0001\u0004\u0005\r\u0011\"\u0003a\u0003\u001d\u0011XO\u001c8j]\u001e,\u0012!\u0019\t\u0003E&l\u0011a\u0019\u0006\u0003I\u0016\fa!\u0019;p[&\u001c'B\u00014h\u0003)\u0019wN\\2veJ,g\u000e\u001e\u0006\u0003Q\u0016\u000bA!\u001e;jY&\u0011!n\u0019\u0002\u000e\u0003R|W.[2C_>dW-\u00198\t\u00131\u0004\u0001\u0019!a\u0001\n\u0013i\u0017a\u0003:v]:LgnZ0%KF$\"\u0001\u000e8\t\u000fmZ\u0017\u0011!a\u0001C\"1\u0001\u000f\u0001Q!\n\u0005\f\u0001B];o]&tw\r\t\u0005\u0006e\u0002!\te]\u0001\bm\u0016\u00148/[8o)\u0005!\bCA;}\u001d\t1(\u0010\u0005\u0002xm5\t\u0001P\u0003\u0002z\u0019\u00051AH]8pizJ!a\u001f\u001c\u0002\rA\u0013X\rZ3g\u0013\tihP\u0001\u0004TiJLgn\u001a\u0006\u0003wZBq!!\u0001\u0001\t\u0003\n\u0019!A\u0003ti\u0006\u0014H\u000fF\u00025\u0003\u000bAq!a\u0002��\u0001\u0004\tI!A\u0003qe>\u00048\u000f\u0005\u0004\u0002\f\u00055A\u000f^\u0007\u0002O&\u0019\u0011qB4\u0003\u00075\u000b\u0007\u000fC\u0004\u0002\u0014\u0001!\t%!\u0006\u0002\tM$x\u000e\u001d\u000b\u0002i!9\u0011\u0011\u0004\u0001\u0005B\u0005m\u0011\u0001\u00029pY2$\"!!\b\u0011\r\u0005-\u0011qDA\u0012\u0013\r\t\tc\u001a\u0002\u0005\u0019&\u001cH\u000fE\u0002\u0010\u0003KI1!a\n\u0011\u00051\u0019v.\u001e:dKJ+7m\u001c:e\u0011\u001d\tY\u0003\u0001C\u0005\u0003[\tABZ3uG\"\u0014VmY8sIN,\"!!\b")
/* loaded from: input_file:com/agoda/kafka/connector/jdbc/JdbcSourceTask.class */
public class JdbcSourceTask extends SourceTask {
    private final Logger logger = LoggerFactory.getLogger(JdbcSourceTask.class);
    private JdbcSourceTaskConfig config;
    private Connection db;
    private DataService dataService;
    private AtomicBoolean running;

    private Logger logger() {
        return this.logger;
    }

    private JdbcSourceTaskConfig config() {
        return this.config;
    }

    private void config_$eq(JdbcSourceTaskConfig jdbcSourceTaskConfig) {
        this.config = jdbcSourceTaskConfig;
    }

    private Connection db() {
        return this.db;
    }

    private void db_$eq(Connection connection) {
        this.db = connection;
    }

    private DataService dataService() {
        return this.dataService;
    }

    private void dataService_$eq(DataService dataService) {
        this.dataService = dataService;
    }

    private AtomicBoolean running() {
        return this.running;
    }

    private void running_$eq(AtomicBoolean atomicBoolean) {
        this.running = atomicBoolean;
    }

    public String version() {
        return Version$.MODULE$.getVersion();
    }

    public void start(Map<String, String> map) {
        Success apply = Try$.MODULE$.apply(() -> {
            return new JdbcSourceTaskConfig(((TraversableOnce) JavaConverters$.MODULE$.mapAsScalaMapConverter(map).asScala()).toMap(Predef$.MODULE$.$conforms()));
        });
        if (apply instanceof Success) {
            config_$eq((JdbcSourceTaskConfig) apply.value());
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(apply instanceof Failure)) {
                throw new MatchError(apply);
            }
            logger().error("Couldn't start com.agoda.kafka.connector.jdbc.JdbcSourceTask due to configuration error", new ConnectException(((Failure) apply).exception()));
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        String connectionUrl = config().getConnectionUrl();
        logger().debug(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Trying to connect to ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{connectionUrl})));
        boolean z = false;
        Failure failure = null;
        Success apply2 = Try$.MODULE$.apply(() -> {
            return DriverManager.getConnection(connectionUrl);
        });
        if (!(apply2 instanceof Success)) {
            if (apply2 instanceof Failure) {
                z = true;
                failure = (Failure) apply2;
                Throwable exception = failure.exception();
                if (exception instanceof SQLException) {
                    SQLException sQLException = (SQLException) exception;
                    logger().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Couldn't open connection to ", " : "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{connectionUrl})), sQLException);
                    throw new ConnectException(sQLException);
                }
            }
            if (!z) {
                throw new MatchError(apply2);
            }
            Throwable exception2 = failure.exception();
            logger().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Couldn't open connection to ", " : "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{connectionUrl})), exception2);
            throw exception2;
        }
        db_$eq((Connection) apply2.value());
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        Map offset = this.context.offsetStorageReader().offset((Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(JdbcSourceConnectorConstants$.MODULE$.STORED_PROCEDURE_NAME_KEY()), config().getStoredProcedureName())}))).asJava());
        String storedProcedureName = config().getStoredProcedureName();
        Option<String> timestampVariableName = config().getTimestampVariableName();
        Option<String> timestampFieldName = config().getTimestampFieldName();
        Option<String> incrementingVariableName = config().getIncrementingVariableName();
        Option<String> incrementingFieldName = config().getIncrementingFieldName();
        int maxBatchSize = config().getMaxBatchSize();
        String maxBatchSizeVariableName = config().getMaxBatchSizeVariableName();
        String topic = config().getTopic();
        Option<String> keyField = config().getKeyField();
        Mode mode = config().getMode();
        if (Mode$TimestampMode$.MODULE$.equals(mode)) {
            dataService_$eq(new TimeBasedDataService(storedProcedureName, maxBatchSize, maxBatchSizeVariableName, (String) timestampVariableName.get(), BoxesRunTime.unboxToLong(Try$.MODULE$.apply(() -> {
                return offset.get(Mode$TimestampMode$.MODULE$.entryName());
            }).map(obj -> {
                return BoxesRunTime.boxToLong($anonfun$start$4(obj));
            }).getOrElse(() -> {
                return this.config().getTimestampOffset();
            })), (String) timestampFieldName.get(), topic, keyField));
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        } else if (Mode$IncrementingMode$.MODULE$.equals(mode)) {
            dataService_$eq(new IdBasedDataService(storedProcedureName, maxBatchSize, maxBatchSizeVariableName, (String) incrementingVariableName.get(), BoxesRunTime.unboxToLong(Try$.MODULE$.apply(() -> {
                return offset.get(Mode$IncrementingMode$.MODULE$.entryName());
            }).map(obj2 -> {
                return BoxesRunTime.boxToLong($anonfun$start$7(obj2));
            }).getOrElse(() -> {
                return this.config().getIncrementingOffset();
            })), (String) incrementingFieldName.get(), topic, keyField));
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
        } else {
            if (!Mode$TimestampIncrementingMode$.MODULE$.equals(mode)) {
                throw new MatchError(mode);
            }
            dataService_$eq(new TimeIdBasedDataService(storedProcedureName, maxBatchSize, maxBatchSizeVariableName, (String) timestampVariableName.get(), BoxesRunTime.unboxToLong(Try$.MODULE$.apply(() -> {
                return offset.get(Mode$TimestampMode$.MODULE$.entryName());
            }).map(obj3 -> {
                return BoxesRunTime.boxToLong($anonfun$start$10(obj3));
            }).getOrElse(() -> {
                return this.config().getTimestampOffset();
            })), (String) incrementingVariableName.get(), BoxesRunTime.unboxToLong(Try$.MODULE$.apply(() -> {
                return offset.get(Mode$IncrementingMode$.MODULE$.entryName());
            }).map(obj4 -> {
                return BoxesRunTime.boxToLong($anonfun$start$13(obj4));
            }).getOrElse(() -> {
                return this.config().getIncrementingOffset();
            })), (String) timestampFieldName.get(), (String) incrementingFieldName.get(), topic, keyField));
            BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
        }
        running_$eq(new AtomicBoolean(true));
    }

    public void stop() {
        if (running() != null) {
            running().set(false);
        }
        if (db() != null) {
            logger().debug("Trying to close database connection");
            Failure apply = Try$.MODULE$.apply(() -> {
                this.db().close();
            });
            if (apply instanceof Success) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                if (!(apply instanceof Failure)) {
                    throw new MatchError(apply);
                }
                logger().error("Failed to close database connection: ", apply.exception());
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }
    }

    public synchronized List<SourceRecord> poll() {
        if (running().get()) {
            return fetchRecords();
        }
        return null;
    }

    private List<SourceRecord> fetchRecords() {
        Throwable exception;
        Seq empty;
        logger().debug("Polling new data ...");
        long pollInterval = config().getPollInterval();
        long currentTimeMillis = System.currentTimeMillis();
        boolean z = false;
        Failure failure = null;
        Success records = dataService().getRecords(db(), new package.DurationLong(package$.MODULE$.DurationLong(pollInterval)).millis());
        if (records instanceof Success) {
            Seq seq = (Seq) records.value();
            if (seq.isEmpty()) {
                logger().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"No updates for ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{dataService()})));
            } else {
                logger().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Returning ", " records for ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(seq.size()), dataService()})));
            }
            empty = seq;
        } else {
            if (records instanceof Failure) {
                z = true;
                failure = (Failure) records;
                Throwable exception2 = failure.exception();
                if (exception2 instanceof SQLException) {
                    logger().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Failed to fetch data for ", ": "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{dataService()})), (SQLException) exception2);
                    empty = (Seq) Seq$.MODULE$.empty();
                }
            }
            if (!z || (exception = failure.exception()) == null) {
                throw new MatchError(records);
            }
            logger().error(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Failed to fetch data for ", ": "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{dataService()})), exception);
            empty = Seq$.MODULE$.empty();
        }
        Seq seq2 = empty;
        long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
        if (currentTimeMillis2 < pollInterval) {
            Thread.sleep(pollInterval - currentTimeMillis2);
        }
        return (List) JavaConverters$.MODULE$.seqAsJavaListConverter(seq2).asJava();
    }

    public static final /* synthetic */ long $anonfun$start$4(Object obj) {
        return new StringOps(Predef$.MODULE$.augmentString(obj.toString())).toLong();
    }

    public static final /* synthetic */ long $anonfun$start$7(Object obj) {
        return new StringOps(Predef$.MODULE$.augmentString(obj.toString())).toLong();
    }

    public static final /* synthetic */ long $anonfun$start$10(Object obj) {
        return new StringOps(Predef$.MODULE$.augmentString(obj.toString())).toLong();
    }

    public static final /* synthetic */ long $anonfun$start$13(Object obj) {
        return new StringOps(Predef$.MODULE$.augmentString(obj.toString())).toLong();
    }
}
