package com.hivemq.mqtt.handler.publish;

import com.codahale.metrics.Counter;
import com.hivemq.configuration.service.InternalConfigurations;
import com.hivemq.extension.sdk.api.annotations.NotNull;
import com.hivemq.extension.sdk.api.annotations.Nullable;
import com.hivemq.metrics.MetricsHolder;
import com.hivemq.mqtt.message.publish.PublishWithFuture;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.LinkedList;
import java.util.List;

/* loaded from: input_file:com/hivemq/mqtt/handler/publish/PublishFlushHandler.class */
public class PublishFlushHandler extends ChannelInboundHandlerAdapter implements Runnable {

    @Nullable
    private ChannelHandlerContext ctx;

    @NotNull
    private final Counter channelNotWritable;
    static final /* synthetic */ boolean $assertionsDisabled;

    @NotNull
    private final LinkedList<PublishWithFuture> messagesToWrite = new LinkedList<>();
    private boolean wasWritable = true;
    private final int maxWritesBeforeFlush = InternalConfigurations.MAX_PUBLISHES_BEFORE_FLUSH.get();

    public PublishFlushHandler(@NotNull MetricsHolder metricsHolder) {
        this.channelNotWritable = metricsHolder.getChannelNotWritableCounter();
    }

    public void handlerAdded(@NotNull ChannelHandlerContext channelHandlerContext) {
        this.ctx = channelHandlerContext;
    }

    public void channelWritabilityChanged(@NotNull ChannelHandlerContext channelHandlerContext) {
        Channel channel = channelHandlerContext.channel();
        if (channel.isWritable() && !this.wasWritable) {
            this.wasWritable = true;
            this.channelNotWritable.dec();
            channel.eventLoop().execute(this);
        }
        channelHandlerContext.fireChannelWritabilityChanged();
    }

    public void channelInactive(@NotNull ChannelHandlerContext channelHandlerContext) throws Exception {
        handleChannelInactiveState();
        super.channelInactive(channelHandlerContext);
    }

    private void handleChannelInactiveState() {
        while (!this.messagesToWrite.isEmpty()) {
            this.messagesToWrite.poll().getFuture().set(PublishStatus.NOT_CONNECTED);
        }
    }

    public void sendPublishes(@NotNull List<PublishWithFuture> list) {
        if (!$assertionsDisabled && this.ctx == null) {
            throw new AssertionError("ctx can not be null because sendPublishes is called after handlerAdded");
        }
        this.ctx.channel().eventLoop().execute(() -> {
            this.messagesToWrite.addAll(list);
            if (this.ctx.channel().isActive()) {
                consumeQueue();
            } else {
                handleChannelInactiveState();
            }
        });
    }

    @Override // java.lang.Runnable
    public void run() {
        consumeQueue();
    }

    private void consumeQueue() {
        if (!$assertionsDisabled && this.ctx == null) {
            throw new AssertionError("ctx can not be null because consumeQueue is called after handlerAdded");
        }
        int i = 0;
        while (true) {
            if (this.messagesToWrite.isEmpty()) {
                break;
            }
            if (this.ctx.channel().isWritable()) {
                PublishWithFuture poll = this.messagesToWrite.poll();
                this.ctx.write(poll).addListener(new PublishWriteFailedListener(poll.getFuture()));
                i++;
                if (i >= this.maxWritesBeforeFlush) {
                    this.ctx.flush();
                    i = 0;
                }
            } else if (this.wasWritable) {
                this.wasWritable = false;
                this.channelNotWritable.inc();
            }
        }
        if (i > 0) {
            this.ctx.flush();
        }
    }

    static {
        $assertionsDisabled = !PublishFlushHandler.class.desiredAssertionStatus();
    }
}
