package com.github.davidfantasy.flink.connector.mqtt.source;

import com.github.davidfantasy.flink.connector.mqtt.MqttProperties;
import com.github.davidfantasy.flink.connector.mqtt.MqttTopic;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/davidfantasy/flink/connector/mqtt/source/MqttSourceEnumerator.class */
public class MqttSourceEnumerator implements SplitEnumerator<MqttSourceSplit, MqttSplitsCheckpoint> {
    private static final Logger log = LoggerFactory.getLogger(MqttSourceEnumerator.class);
    private SplitEnumeratorContext<MqttSourceSplit> context;
    private MqttProperties mqttProperties;
    private List<MqttTopic> topics;
    private List<MqttSourceSplit> currentSplits;

    public MqttSourceEnumerator(SplitEnumeratorContext<MqttSourceSplit> splitEnumeratorContext, MqttProperties mqttProperties, List<MqttTopic> list) {
        this.context = splitEnumeratorContext;
        this.mqttProperties = mqttProperties;
        this.topics = list;
    }

    public void start() {
        this.currentSplits = createSplits();
    }

    public void handleSplitRequest(int i, @Nullable String str) {
        log.warn("接受到handleSplitRequest，subtaskId:{}，requesterHostname:{}", Integer.valueOf(i), str);
    }

    public void addSplitsBack(List<MqttSourceSplit> list, int i) {
        log.warn("MQTT source Enumerator退回了一些分片：{},{}", list, Integer.valueOf(i));
        this.currentSplits.addAll(list);
    }

    public void addReader(int i) {
        log.info("添加了一个新的reader,{},{}", Integer.valueOf(i), this.context.registeredReaders().get(Integer.valueOf(i)));
        assignSplits(i);
    }

    /* renamed from: snapshotState, reason: merged with bridge method [inline-methods] */
    public MqttSplitsCheckpoint m4snapshotState(long j) throws Exception {
        return new MqttSplitsCheckpoint();
    }

    public void close() throws IOException {
    }

    private List<MqttSourceSplit> createSplits() {
        ArrayList arrayList = new ArrayList();
        int i = 0;
        for (MqttTopic mqttTopic : this.topics) {
            i++;
            arrayList.add(new MqttSourceSplit(String.valueOf(i), mqttTopic.getTopic(), mqttTopic.getQos(), this.mqttProperties));
        }
        return arrayList;
    }

    private void assignSplits(int i) {
        Iterator<MqttSourceSplit> it = this.currentSplits.iterator();
        while (it.hasNext()) {
            this.context.assignSplit(it.next(), i);
        }
        this.currentSplits.clear();
    }
}
