package com.github.davidfantasy.flink.connector.mqtt.source;

import com.github.davidfantasy.flink.connector.mqtt.MqttProperties;
import com.github.davidfantasy.flink.connector.mqtt.MqttTopic;
import java.util.List;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.table.data.RowData;

/* loaded from: input_file:com/github/davidfantasy/flink/connector/mqtt/source/MqttSource.class */
public class MqttSource implements Source<RowData, MqttSourceSplit, MqttSplitsCheckpoint> {
    private MqttProperties mqttProperties;
    private List<MqttTopic> topics;

    public MqttSource(MqttProperties mqttProperties, List<MqttTopic> list) {
        if (mqttProperties == null || list == null || list.isEmpty()) {
            throw new IllegalStateException("MQTT配置信息缺失");
        }
        this.mqttProperties = mqttProperties;
        this.topics = list;
    }

    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    public SplitEnumerator<MqttSourceSplit, MqttSplitsCheckpoint> createEnumerator(SplitEnumeratorContext<MqttSourceSplit> splitEnumeratorContext) throws Exception {
        return new MqttSourceEnumerator(splitEnumeratorContext, this.mqttProperties, this.topics);
    }

    public SplitEnumerator<MqttSourceSplit, MqttSplitsCheckpoint> restoreEnumerator(SplitEnumeratorContext<MqttSourceSplit> splitEnumeratorContext, MqttSplitsCheckpoint mqttSplitsCheckpoint) throws Exception {
        return new MqttSourceEnumerator(splitEnumeratorContext, this.mqttProperties, this.topics);
    }

    public SimpleVersionedSerializer<MqttSourceSplit> getSplitSerializer() {
        return new MqttSourceSplitSerializer();
    }

    public SimpleVersionedSerializer<MqttSplitsCheckpoint> getEnumeratorCheckpointSerializer() {
        return new MqttSourceEnumeratorCheckpointSerializer();
    }

    public SourceReader<RowData, MqttSourceSplit> createReader(SourceReaderContext sourceReaderContext) throws Exception {
        return new MqttSourceReader(sourceReaderContext);
    }

    public /* bridge */ /* synthetic */ SplitEnumerator restoreEnumerator(SplitEnumeratorContext splitEnumeratorContext, Object obj) throws Exception {
        return restoreEnumerator((SplitEnumeratorContext<MqttSourceSplit>) splitEnumeratorContext, (MqttSplitsCheckpoint) obj);
    }
}
