package akka.actor.typed.delivery.internal;

import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorRef$;
import akka.actor.typed.ActorRef$ActorRefOps$;
import akka.actor.typed.Behavior;
import akka.actor.typed.DispatcherSelector$;
import akka.actor.typed.delivery.ConsumerController;
import akka.actor.typed.delivery.DurableProducerQueue;
import akka.actor.typed.delivery.DurableProducerQueue$MessageSent$;
import akka.actor.typed.delivery.DurableProducerQueue$State$;
import akka.actor.typed.delivery.WorkPullingProducerController;
import akka.actor.typed.delivery.internal.WorkPullingProducerControllerImpl;
import akka.actor.typed.receptionist.Receptionist;
import akka.actor.typed.receptionist.Receptionist$Subscribe$;
import akka.actor.typed.receptionist.ServiceKey;
import akka.actor.typed.scaladsl.ActorContext;
import akka.actor.typed.scaladsl.Behaviors$;
import akka.actor.typed.scaladsl.StashBuffer;
import akka.annotation.InternalApi;
import akka.util.Timeout;
import akka.util.Timeout$;
import java.util.concurrent.TimeoutException;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple5;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.Failure;
import scala.util.Success;

/* compiled from: WorkPullingProducerControllerImpl.scala */
@InternalApi
/* loaded from: input_file:akka/actor/typed/delivery/internal/WorkPullingProducerControllerImpl$.class */
public final class WorkPullingProducerControllerImpl$ {
    public static final WorkPullingProducerControllerImpl$ MODULE$ = new WorkPullingProducerControllerImpl$();

    public <A> Behavior<WorkPullingProducerController.Command<A>> apply(String str, ServiceKey<ConsumerController.Command<A>> serviceKey, Option<Behavior<DurableProducerQueue.Command<A>>> option, WorkPullingProducerController.Settings settings, ClassTag<A> classTag) {
        return Behaviors$.MODULE$.withStash(settings.bufferSize(), stashBuffer -> {
            return Behaviors$.MODULE$.setup(actorContext -> {
                Behaviors$ behaviors$ = Behaviors$.MODULE$;
                Map<String, String> map = (Map) Predef$.MODULE$.Map().apply2(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("producerId"), str)}));
                actorContext.setLoggerName("akka.actor.typed.delivery.WorkPullingProducerController");
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(actorContext.system().receptionist()), Receptionist$Subscribe$.MODULE$.apply(serviceKey, actorContext.messageAdapter(listing -> {
                    return new WorkPullingProducerControllerImpl.CurrentWorkers(listing.allServiceInstances(serviceKey));
                }, ClassTag$.MODULE$.apply(Receptionist.Listing.class))));
                Option askLoadState = MODULE$.askLoadState(actorContext, option, settings, classTag);
                return behaviors$.withMdc(map, (Behavior) MODULE$.waitingForStart(str, actorContext, stashBuffer, askLoadState, settings, None$.MODULE$, MODULE$.createInitialState(askLoadState.nonEmpty(), classTag), classTag), ClassTag$.MODULE$.apply(WorkPullingProducerControllerImpl.InternalCommand.class));
            });
        }).narrow();
    }

    private <A> Option<DurableProducerQueue.State<A>> createInitialState(boolean z, ClassTag<A> classTag) {
        return z ? None$.MODULE$ : new Some(DurableProducerQueue$State$.MODULE$.empty());
    }

    private <A> Behavior<WorkPullingProducerControllerImpl.InternalCommand> waitingForStart(String str, ActorContext<WorkPullingProducerControllerImpl.InternalCommand> actorContext, StashBuffer<WorkPullingProducerControllerImpl.InternalCommand> stashBuffer, Option<ActorRef<DurableProducerQueue.Command<A>>> option, WorkPullingProducerController.Settings settings, Option<ActorRef<WorkPullingProducerController.RequestNext<A>>> option2, Option<DurableProducerQueue.State<A>> option3, ClassTag<A> classTag) {
        return (Behavior) Behaviors$.MODULE$.receiveMessage(internalCommand -> {
            Behavior<WorkPullingProducerControllerImpl.InternalCommand> same;
            Behavior<WorkPullingProducerControllerImpl.InternalCommand> waitingForStart;
            Behavior<WorkPullingProducerControllerImpl.InternalCommand> waitingForStart2;
            if (internalCommand instanceof WorkPullingProducerController.Start) {
                WorkPullingProducerController.Start start = (WorkPullingProducerController.Start) internalCommand;
                ProducerControllerImpl$.MODULE$.enforceLocalProducer(start.producer());
                if (option3 instanceof Some) {
                    waitingForStart2 = this.becomeActive$1(start.producer(), (DurableProducerQueue.State) ((Some) option3).value(), actorContext, classTag, stashBuffer, str, option, settings);
                } else {
                    if (!None$.MODULE$.equals(option3)) {
                        throw new MatchError(option3);
                    }
                    waitingForStart2 = MODULE$.waitingForStart(str, actorContext, stashBuffer, option, settings, new Some(start.producer()), option3, classTag);
                }
                same = waitingForStart2;
            } else if (internalCommand instanceof WorkPullingProducerControllerImpl.LoadStateReply) {
                WorkPullingProducerControllerImpl.LoadStateReply loadStateReply = (WorkPullingProducerControllerImpl.LoadStateReply) internalCommand;
                if (option2 instanceof Some) {
                    waitingForStart = this.becomeActive$1((ActorRef) ((Some) option2).value(), loadStateReply.state(), actorContext, classTag, stashBuffer, str, option, settings);
                } else {
                    if (!None$.MODULE$.equals(option2)) {
                        throw new MatchError(option2);
                    }
                    waitingForStart = MODULE$.waitingForStart(str, actorContext, stashBuffer, option, settings, option2, new Some(loadStateReply.state()), classTag);
                }
                same = waitingForStart;
            } else if (internalCommand instanceof WorkPullingProducerControllerImpl.LoadStateFailed) {
                int attempt = ((WorkPullingProducerControllerImpl.LoadStateFailed) internalCommand).attempt();
                if (attempt >= settings.producerControllerSettings().durableQueueRetryAttempts()) {
                    String sb = new StringBuilder(46).append("LoadState failed after [").append(attempt).append("] attempts, giving up.").toString();
                    actorContext.log().error(sb);
                    throw new TimeoutException(sb);
                }
                actorContext.log().warn("LoadState failed, attempt [{}] of [{}], retrying.", BoxesRunTime.boxToInteger(attempt), BoxesRunTime.boxToInteger(settings.producerControllerSettings().durableQueueRetryAttempts()));
                MODULE$.askLoadState(actorContext, option, settings, attempt + 1, classTag);
                same = Behaviors$.MODULE$.same();
            } else {
                if (WorkPullingProducerControllerImpl$DurableQueueTerminated$.MODULE$.equals(internalCommand)) {
                    throw new IllegalStateException("DurableQueue was unexpectedly terminated.");
                }
                MODULE$.akka$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$checkStashFull(stashBuffer, classTag);
                stashBuffer.stash(internalCommand);
                same = Behaviors$.MODULE$.same();
            }
            return same;
        });
    }

    public <A> void akka$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$checkStashFull(StashBuffer<WorkPullingProducerControllerImpl.InternalCommand> stashBuffer, ClassTag<A> classTag) {
        if (stashBuffer.isFull()) {
            throw new IllegalArgumentException(new StringBuilder(24).append("Buffer is full, size [").append(stashBuffer.size()).append("].").toString());
        }
    }

    private <A> Option<ActorRef<DurableProducerQueue.Command<A>>> askLoadState(ActorContext<WorkPullingProducerControllerImpl.InternalCommand> actorContext, Option<Behavior<DurableProducerQueue.Command<A>>> option, WorkPullingProducerController.Settings settings, ClassTag<A> classTag) {
        return (Option<ActorRef<DurableProducerQueue.Command<A>>>) option.map(behavior -> {
            ActorRef spawn = actorContext.spawn(behavior, "durable", DispatcherSelector$.MODULE$.sameAsParent());
            actorContext.watchWith(spawn, WorkPullingProducerControllerImpl$DurableQueueTerminated$.MODULE$);
            MODULE$.askLoadState(actorContext, new Some(spawn), settings, 1, classTag);
            return spawn;
        });
    }

    private <A> void askLoadState(ActorContext<WorkPullingProducerControllerImpl.InternalCommand> actorContext, Option<ActorRef<DurableProducerQueue.Command<A>>> option, WorkPullingProducerController.Settings settings, int i, ClassTag<A> classTag) {
        Timeout durationToTimeout = Timeout$.MODULE$.durationToTimeout(settings.producerControllerSettings().durableQueueRequestTimeout());
        option.foreach(actorRef -> {
            $anonfun$askLoadState$2(actorContext, i, durationToTimeout, actorRef);
            return BoxedUnit.UNIT;
        });
    }

    private <A> WorkPullingProducerControllerImpl.State<A> createInitialState(long j, ActorRef<WorkPullingProducerController.RequestNext<A>> actorRef) {
        return new WorkPullingProducerControllerImpl.State<>(j, Predef$.MODULE$.Set().empty2(), Predef$.MODULE$.Map().empty2(), Predef$.MODULE$.Map().empty2(), Predef$.MODULE$.Map().empty2(), Predef$.MODULE$.Map().empty2(), actorRef, false);
    }

    public static final /* synthetic */ void $anonfun$waitingForStart$1(ActorContext actorContext, DurableProducerQueue.MessageSent messageSent) {
        if (messageSent != null) {
            Option<Tuple5<Object, Object, Object, String, Object>> unapply = DurableProducerQueue$MessageSent$.MODULE$.unapply(messageSent);
            if (!unapply.isEmpty()) {
                long unboxToLong = BoxesRunTime.unboxToLong(unapply.get()._1());
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(actorContext.self()), new WorkPullingProducerControllerImpl.ResendDurableMsg(unapply.get()._2(), unapply.get()._4(), unboxToLong));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        throw new MatchError(messageSent);
    }

    private final Behavior becomeActive$1(ActorRef actorRef, DurableProducerQueue.State state, ActorContext actorContext, ClassTag classTag, StashBuffer stashBuffer, String str, Option option, WorkPullingProducerController.Settings settings) {
        state.unconfirmed().foreach(messageSent -> {
            $anonfun$waitingForStart$1(actorContext, messageSent);
            return BoxedUnit.UNIT;
        });
        return stashBuffer.unstashAll(new WorkPullingProducerControllerImpl(actorContext, stashBuffer, str, new WorkPullingProducerController.RequestNext(actorContext.messageAdapter(obj -> {
            return new WorkPullingProducerControllerImpl.Msg(obj, false, None$.MODULE$);
        }, classTag), actorContext.self()), option, settings, classTag).akka$actor$typed$delivery$internal$WorkPullingProducerControllerImpl$$active(createInitialState(state.currentSeqNr(), actorRef)));
    }

    public static final /* synthetic */ void $anonfun$askLoadState$2(ActorContext actorContext, int i, Timeout timeout, ActorRef actorRef) {
        actorContext.ask(actorRef, actorRef2 -> {
            return new DurableProducerQueue.LoadState(actorRef2);
        }, r5 -> {
            WorkPullingProducerControllerImpl.InternalCommand loadStateFailed;
            if (r5 instanceof Success) {
                loadStateFailed = new WorkPullingProducerControllerImpl.LoadStateReply((DurableProducerQueue.State) ((Success) r5).value());
            } else {
                if (!(r5 instanceof Failure)) {
                    throw new MatchError(r5);
                }
                loadStateFailed = new WorkPullingProducerControllerImpl.LoadStateFailed(i);
            }
            return loadStateFailed;
        }, timeout, ClassTag$.MODULE$.apply(DurableProducerQueue.State.class));
    }

    private WorkPullingProducerControllerImpl$() {
    }
}
