package com.github.hyjay.mqtt.actor;

import cats.effect.ContextShift;
import cats.effect.IO;
import cats.effect.IO$;
import cats.effect.Timer;
import com.github.hyjay.mqtt.core.MqttClient;
import com.github.hyjay.mqtt.core.MqttPacketSender;
import com.github.hyjay.mqtt.core.PINGREQ;
import com.github.hyjay.mqtt.netty.NettyMqttClient;
import fs2.Stream;
import fs2.Stream$;
import fs2.Stream$Compiler$;
import fs2.internal.FreeC;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.MatchError;
import scala.Tuple2;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;

/* compiled from: MqttActor.scala */
/* loaded from: input_file:com/github/hyjay/mqtt/actor/MqttActor$.class */
public final class MqttActor$ {
    public static MqttActor$ MODULE$;
    private final Logger com$github$hyjay$mqtt$actor$MqttActor$$logger;

    static {
        new MqttActor$();
    }

    public Logger com$github$hyjay$mqtt$actor$MqttActor$$logger() {
        return this.com$github$hyjay$mqtt$actor$MqttActor$$logger;
    }

    public Future<BoxedUnit> run(ConnectionConfig connectionConfig, MqttActor mqttActor, ExecutionContext executionContext, Scheduler scheduler) {
        ContextShift contextShift = IO$.MODULE$.contextShift(executionContext);
        Timer timer = IO$.MODULE$.timer(executionContext);
        NettyMqttClient nettyMqttClient = new NettyMqttClient(connectionConfig.host(), connectionConfig.port(), connectionConfig.tls(), executionContext);
        return nettyMqttClient.connect(connectionConfig.connectPacket(), executionContext).map(connack -> {
            return new Tuple2(connack, new Stream(Stream$.MODULE$.merge$extension(Stream$.MODULE$.$plus$plus$extension(Stream$.MODULE$.emit(connack), () -> {
                return new Stream($anonfun$run$6(nettyMqttClient, executionContext));
            }), Stream$.MODULE$.drain$extension(send$1(nettyMqttClient, connectionConfig, timer, contextShift)), IO$.MODULE$.ioConcurrentEffect(contextShift))));
        }, executionContext).flatMap(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return ((IO) Stream$.MODULE$.compile$extension(Stream$.MODULE$.through$extension(((Stream) tuple2._2()).fs2$Stream$$free(), obj -> {
                return new Stream($anonfun$run$9(mqttActor, nettyMqttClient, ((Stream) obj).fs2$Stream$$free()));
            }), Stream$Compiler$.MODULE$.syncInstance(IO$.MODULE$.ioConcurrentEffect(contextShift))).drain()).unsafeToFuture().map(boxedUnit -> {
                $anonfun$run$12(boxedUnit);
                return BoxedUnit.UNIT;
            }, executionContext);
        }, executionContext);
    }

    public Future<BoxedUnit> runForever(ConnectionConfig connectionConfig, Function0<MqttActor> function0, FiniteDuration finiteDuration, ExecutionContext executionContext, Scheduler scheduler) {
        return com$github$hyjay$mqtt$actor$MqttActor$$go$1(connectionConfig, function0, executionContext, scheduler, finiteDuration);
    }

    private static final FreeC ping$1(MqttPacketSender mqttPacketSender, ConnectionConfig connectionConfig, Timer timer, ContextShift contextShift) {
        return connectionConfig.connectPacket().keepAlive().toSeconds() == 0 ? Stream$.MODULE$.empty() : Stream$.MODULE$.map$extension(Stream$.MODULE$.awakeDelay(connectionConfig.connectPacket().keepAlive(), timer, IO$.MODULE$.ioConcurrentEffect(contextShift)), finiteDuration -> {
            return new PINGREQ();
        });
    }

    public static final /* synthetic */ FreeC $anonfun$run$2(MqttClient mqttClient, FreeC freeC) {
        return Stream$.MODULE$.evalMap$extension(freeC, packet -> {
            return IO$.MODULE$.apply(() -> {
                mqttClient.send(packet);
            });
        });
    }

    private static final FreeC send$1(MqttClient mqttClient, ConnectionConfig connectionConfig, Timer timer, ContextShift contextShift) {
        return Stream$.MODULE$.through$extension(ping$1(mqttClient, connectionConfig, timer, contextShift), obj -> {
            return new Stream($anonfun$run$2(mqttClient, ((Stream) obj).fs2$Stream$$free()));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$run$6(NettyMqttClient nettyMqttClient, ExecutionContext executionContext) {
        return Stream$.MODULE$.repeatEval(IO$.MODULE$.fromFuture(IO$.MODULE$.apply(() -> {
            return nettyMqttClient.pull(executionContext);
        })));
    }

    public static final /* synthetic */ FreeC $anonfun$run$9(MqttActor mqttActor, NettyMqttClient nettyMqttClient, FreeC freeC) {
        return Stream$.MODULE$.evalMap$extension(freeC, packet -> {
            return IO$.MODULE$.apply(() -> {
                mqttActor.onReceived(packet, nettyMqttClient);
            });
        });
    }

    public static final /* synthetic */ void $anonfun$run$12(BoxedUnit boxedUnit) {
    }

    public final Future com$github$hyjay$mqtt$actor$MqttActor$$go$1(ConnectionConfig connectionConfig, Function0 function0, ExecutionContext executionContext, Scheduler scheduler, FiniteDuration finiteDuration) {
        return run(connectionConfig, (MqttActor) function0.apply(), executionContext, scheduler).recoverWith(new MqttActor$$anonfun$com$github$hyjay$mqtt$actor$MqttActor$$go$1$1(scheduler, finiteDuration, executionContext, connectionConfig, function0), executionContext);
    }

    private MqttActor$() {
        MODULE$ = this;
        this.com$github$hyjay$mqtt$actor$MqttActor$$logger = LoggerFactory.getLogger("MqttActor");
    }
}
