package com.github.davidfantasy.flink.connector.mqtt.source;

import com.github.davidfantasy.flink.connector.mqtt.DebounceTask;
import com.github.davidfantasy.flink.connector.mqtt.MqttMessage;
import com.github.davidfantasy.flink.connector.mqtt.MqttProperties;
import com.hivemq.client.mqtt.MqttClient;
import com.hivemq.client.mqtt.MqttClientConfig;
import com.hivemq.client.mqtt.datatypes.MqttClientIdentifier;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.message.auth.Mqtt3SimpleAuth;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/davidfantasy/flink/connector/mqtt/source/MqttSourceSplitReader.class */
public class MqttSourceSplitReader implements SplitReader<RowData, MqttSourceSplit> {
    private static final Logger log = LoggerFactory.getLogger(MqttSourceSplitReader.class);
    private List<Mqtt3AsyncClient> mqttClients;
    private final String MQTT_CLIENTID_PREFIX = "flink-mqtt-connector:";
    private final long MSG_CACHE_LIMIT = 100000;
    private final List<MqttSourceSplit> splits = new ArrayList();
    private BlockingQueue<MqttMessage> messageQueue = new LinkedBlockingQueue(10000);
    private final DebounceTask warningPrinter = DebounceTask.build(() -> {
        log.warn("消息缓存队列已满，数据将被丢弃");
    }, 5000L);

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v27, types: [java.util.List] */
    public RecordsWithSplitIds<RowData> fetch() throws IOException {
        ArrayList arrayList;
        if (this.splits.size() == 0) {
            return new MqttRecords(null, null);
        }
        initMqttClient();
        String id = this.splits.get(0).getId();
        try {
            MqttMessage take = this.messageQueue.take();
            if (this.messageQueue.isEmpty()) {
                arrayList = Collections.singletonList(take);
            } else {
                arrayList = new ArrayList(this.messageQueue.size() + 1);
                arrayList.add(take);
                arrayList.addAll(this.messageQueue);
            }
            this.messageQueue.clear();
            return new MqttRecords(id, arrayList.iterator());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void handleSplitsChanges(SplitsChange<MqttSourceSplit> splitsChange) {
        if (!(splitsChange instanceof SplitsAddition)) {
            throw new UnsupportedOperationException(String.format("The SplitChange type of %s is not supported.", splitsChange.getClass()));
        }
        this.splits.addAll(splitsChange.splits());
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        this.mqttClients.forEach(mqtt3AsyncClient -> {
            try {
                mqtt3AsyncClient.disconnect().get(3000L, TimeUnit.MILLISECONDS);
            } catch (Exception e) {
                log.error("关闭客户端连接时发生错误：{}", e.getMessage());
            }
        });
    }

    private void initMqttClient() {
        if (this.mqttClients != null) {
            return;
        }
        this.mqttClients = new ArrayList();
        try {
            for (MqttSourceSplit mqttSourceSplit : this.splits) {
                Mqtt3AsyncClient buildMqttClient = buildMqttClient(mqttSourceSplit);
                buildMqttClient.connect().get(5000L, TimeUnit.MILLISECONDS);
                buildMqttClient.subscribeWith().topicFilter(mqttSourceSplit.getTopic()).qos(convertMqttQos(mqttSourceSplit.getQos())).callback(this::handleReceivedMsg).send();
                this.mqttClients.add(buildMqttClient);
            }
        } catch (Exception e) {
            throw new IllegalStateException("初始化mqtt 客户端发生错误", e);
        }
    }

    private Mqtt3AsyncClient buildMqttClient(MqttSourceSplit mqttSourceSplit) {
        MqttProperties mqttProperties = mqttSourceSplit.getMqttProperties();
        return MqttClient.builder().useMqttVersion3().serverHost(mqttProperties.getHost()).serverPort(mqttProperties.getPort().intValue()).simpleAuth(Mqtt3SimpleAuth.builder().username(mqttProperties.getUsername()).password(mqttProperties.getPassword().getBytes(StandardCharsets.UTF_8)).build()).automaticReconnectWithDefaultConfig().identifier("flink-mqtt-connector:" + mqttSourceSplit.getId()).addConnectedListener(mqttClientConnectedContext -> {
            MqttClientConfig clientConfig = mqttClientConnectedContext.getClientConfig();
            log.info("mqtt客户端{}连接成功，连接地址：{}，端口：{}", new Object[]{clientConfig.getClientIdentifier().isPresent() ? ((MqttClientIdentifier) clientConfig.getClientIdentifier().get()).toString() : "", clientConfig.getServerHost(), Integer.valueOf(clientConfig.getServerPort())});
        }).addDisconnectedListener(mqttClientDisconnectedContext -> {
            MqttClientConfig clientConfig = mqttClientDisconnectedContext.getClientConfig();
            log.warn("mqtt客户端{}连接丢失 - {}", new Object[]{clientConfig.getClientIdentifier().isPresent() ? ((MqttClientIdentifier) clientConfig.getClientIdentifier().get()).toString() : "", mqttClientDisconnectedContext.getCause().getMessage(), mqttClientDisconnectedContext.getCause()});
        }).buildAsync();
    }

    private MqttQos convertMqttQos(Integer num) {
        return num == null ? MqttQos.AT_MOST_ONCE : MqttQos.fromCode(num.intValue());
    }

    private void handleReceivedMsg(Mqtt3Publish mqtt3Publish) {
        MqttMessage mqttMessage = new MqttMessage(mqtt3Publish.getTopic().toString(), Integer.valueOf(mqtt3Publish.getQos().getCode()), mqtt3Publish.getPayloadAsBytes(), RowKind.INSERT);
        if (this.messageQueue.size() > 100000) {
            this.warningPrinter.doTask();
        } else {
            this.messageQueue.add(mqttMessage);
        }
    }
}
