package com.aerospike.kafka.connect.sink;

import com.aerospike.client.AerospikeException;
import com.aerospike.kafka.connect.data.AerospikeRecord;
import com.aerospike.kafka.connect.data.RecordMapperFactory;
import java.util.Collection;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aerospike/kafka/connect/sink/AerospikeSinkTask.class */
public class AerospikeSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(AerospikeSinkTask.class);
    private RecordMapperFactory mappers;
    private AsyncWriter writer;
    private long lastFlushTimeMillis = 0;
    private Map<TopicPartition, OffsetAndMetadata> lastOffsets;

    public String version() {
        return new AerospikeSinkConnector().version();
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
        if (log.isInfoEnabled()) {
            report(map);
        }
        this.writer.flush();
    }

    public void put(Collection<SinkRecord> collection) {
        for (SinkRecord sinkRecord : collection) {
            try {
                AerospikeRecord convertRecord = this.mappers.getMapper(sinkRecord).convertRecord(sinkRecord);
                log.trace("Writing record for key {}: {}", convertRecord.key(), convertRecord.bins());
                this.writer.write(convertRecord);
            } catch (AerospikeException e) {
                log.error("Error writing to record", e);
            }
        }
    }

    public void start(Map<String, String> map) {
        log.trace("Starting {} task with config: {}", getClass().getName(), map);
        ConnectorConfig connectorConfig = new ConnectorConfig(map);
        this.mappers = new RecordMapperFactory(connectorConfig.getTopicConfigs());
        this.writer = new AsyncWriter(connectorConfig);
    }

    public void stop() {
        log.trace("Stopping {} task", getClass().getName());
        if (this.writer != null) {
            this.writer.close();
        }
    }

    private void report(Map<TopicPartition, OffsetAndMetadata> map) {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.lastFlushTimeMillis > 0) {
            long j = currentTimeMillis - this.lastFlushTimeMillis;
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
                TopicPartition key = entry.getKey();
                OffsetAndMetadata offsetAndMetadata = this.lastOffsets.get(key);
                if (offsetAndMetadata != null) {
                    long offset = entry.getValue().offset() - offsetAndMetadata.offset();
                    log.info("Wrote {} records in {} ms for topic {}, partition {} - throughput: {} TPS", new Object[]{Long.valueOf(offset), Long.valueOf(j), key.topic(), Integer.valueOf(key.partition()), Long.valueOf(Math.round((1000.0d * offset) / j))});
                }
            }
        }
        this.lastFlushTimeMillis = currentTimeMillis;
        this.lastOffsets = map;
    }
}
