package akka.persistence.r2dbc.query;

import akka.NotUsed;
import akka.persistence.r2dbc.journal.JournalEntry;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import java.util.concurrent.atomic.AtomicBoolean;
import scala.Option;
import scala.Predef$;
import scala.collection.StringOps$;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: EventsByPersistenceIdStage.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005]sA\u0002\u000e\u001c\u0011\u0003Y2E\u0002\u0004&7!\u00051D\n\u0005\u0006[\u0005!\ta\f\u0005\u0006a\u0005!\t!\r\u0005\n\u0003\u007f\t\u0011\u0013!C\u0001\u0003\u00032Q!J\u000e\u00037MB\u0001bN\u0003\u0003\u0002\u0003\u0006I\u0001\u000f\u0005\tw\u0015\u0011\t\u0011)A\u0005y!Aq)\u0002B\u0001B\u0003%\u0001\n\u0003\u0005L\u000b\t\u0005\t\u0015!\u0003I\u0011!aUA!b\u0001\n\u0003i\u0005\u0002C-\u0006\u0005\u0003\u0005\u000b\u0011\u0002(\t\u000b5*A\u0011\u0002.\t\u000f\u0001,!\u0019!C\u000bC\"1Q.\u0002Q\u0001\u000e\tDqA\\\u0003A\u0002\u0013%q\u000eC\u0004q\u000b\u0001\u0007I\u0011B9\t\r],\u0001\u0015)\u0003I\u0011\u001dAX\u00011A\u0005\n=Dq!_\u0003A\u0002\u0013%!\u0010\u0003\u0004}\u000b\u0001\u0006K\u0001\u0013\u0005\b{\u0016\u0001\r\u0011\"\u0003p\u0011\u001dqX\u00011A\u0005\n}Dq!a\u0001\u0006A\u0003&\u0001\nC\u0004\u0002\u0006\u0015!\t&a\u0002\t\u000f\u0005eQ\u0001\"\u0015\u0002\u001c\u0005QRI^3oiN\u0014\u0015\u0010U3sg&\u001cH/\u001a8dK&#7\u000b^1hK*\u0011A$H\u0001\u0006cV,'/\u001f\u0006\u0003=}\tQA\u001d\u001aeE\u000eT!\u0001I\u0011\u0002\u0017A,'o]5ti\u0016t7-\u001a\u0006\u0002E\u0005!\u0011m[6b!\t!\u0013!D\u0001\u001c\u0005i)e/\u001a8ug\nK\b+\u001a:tSN$XM\\2f\u0013\u0012\u001cF/Y4f'\t\tq\u0005\u0005\u0002)W5\t\u0011FC\u0001+\u0003\u0015\u00198-\u00197b\u0013\ta\u0013F\u0001\u0004B]f\u0014VMZ\u0001\u0007y%t\u0017\u000e\u001e \u0004\u0001Q\t1%A\u0003baBd\u0017\u0010F\u00063\u0003k\t9$!\u000f\u0002<\u0005u\u0002C\u0001\u0013\u0006'\t)A\u0007\u0005\u0002%k%\u0011ag\u0007\u0002\u000e\u000bZ,g\u000e^:CsN#\u0018mZ3\u0002\u0007\u0011\fw\u000e\u0005\u0002%s%\u0011!h\u0007\u0002\t#V,'/\u001f#b_\u0006i\u0001/\u001a:tSN$XM\\2f\u0013\u0012\u0004\"!\u0010#\u000f\u0005y\u0012\u0005CA *\u001b\u0005\u0001%BA!/\u0003\u0019a$o\\8u}%\u00111)K\u0001\u0007!J,G-\u001a4\n\u0005\u00153%AB*ue&twM\u0003\u0002DS\u0005IaM]8n'\u0016\fhJ\u001d\t\u0003Q%K!AS\u0015\u0003\t1{gnZ\u0001\bi>\u001cV-\u001d(s\u0003=\u0011XM\u001a:fg\"Le\u000e^3sm\u0006dW#\u0001(\u0011\u0007!z\u0015+\u0003\u0002QS\t1q\n\u001d;j_:\u0004\"AU,\u000e\u0003MS!\u0001V+\u0002\u0011\u0011,(/\u0019;j_:T!AV\u0015\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0002Y'\nqa)\u001b8ji\u0016$UO]1uS>t\u0017\u0001\u0005:fMJ,7\u000f[%oi\u0016\u0014h/\u00197!)\u0019\u00114\fX/_?\")q\u0007\u0004a\u0001q!)1\b\u0004a\u0001y!)q\t\u0004a\u0001\u0011\")1\n\u0004a\u0001\u0011\")A\n\u0004a\u0001\u001d\u0006q1m\\7qY\u0016$XmU<ji\u000eDW#\u00012\u0011\u0005\r\\W\"\u00013\u000b\u0005\u00154\u0017AB1u_6L7M\u0003\u0002WO*\u0011\u0001.[\u0001\u0005kRLGNC\u0001k\u0003\u0011Q\u0017M^1\n\u00051$'!D!u_6L7MQ8pY\u0016\fg.A\bd_6\u0004H.\u001a;f'^LGo\u00195!\u0003A\u0001(o\\2fgN,G-\u00128ue&,7/F\u0001I\u0003Q\u0001(o\\2fgN,G-\u00128ue&,7o\u0018\u0013fcR\u0011!/\u001e\t\u0003QML!\u0001^\u0015\u0003\tUs\u0017\u000e\u001e\u0005\bmB\t\t\u00111\u0001I\u0003\rAH%M\u0001\u0012aJ|7-Z:tK\u0012,e\u000e\u001e:jKN\u0004\u0013AC2veJ,g\u000e^*fc\u0006q1-\u001e:sK:$8+Z9`I\u0015\fHC\u0001:|\u0011\u001d18#!AA\u0002!\u000b1bY;se\u0016tGoU3rA\u0005IA/\u0019:hKR\u001cV-]\u0001\u000ei\u0006\u0014x-\u001a;TKF|F%Z9\u0015\u0007I\f\t\u0001C\u0004w-\u0005\u0005\t\u0019\u0001%\u0002\u0015Q\f'oZ3u'\u0016\f\b%A\u0006qkNDW\rZ#oiJLHc\u0001:\u0002\n!9\u00111\u0002\rA\u0002\u00055\u0011!B3oiJL\b\u0003BA\b\u0003+i!!!\u0005\u000b\u0007\u0005MQ$A\u0004k_V\u0014h.\u00197\n\t\u0005]\u0011\u0011\u0003\u0002\r\u0015>,(O\\1m\u000b:$(/_\u0001\fM\u0016$8\r[#wK:$8\u000f\u0006\u0002\u0002\u001eAA\u0011qDA\u0015\u0003\u001b\ti#\u0004\u0002\u0002\")!\u00111EA\u0013\u0003!\u00198-\u00197bINd'bAA\u0014C\u000511\u000f\u001e:fC6LA!a\u000b\u0002\"\t11k\\;sG\u0016\u0004B!a\f\u000225\t\u0011%C\u0002\u00024\u0005\u0012qAT8u+N,G\rC\u00038\u0007\u0001\u0007\u0001\bC\u0003<\u0007\u0001\u0007A\bC\u0003H\u0007\u0001\u0007\u0001\nC\u0003L\u0007\u0001\u0007\u0001\nC\u0004M\u0007A\u0005\t\u0019\u0001(\u0002\u001f\u0005\u0004\b\u000f\\=%I\u00164\u0017-\u001e7uIU*\"!a\u0011+\u00079\u000b)e\u000b\u0002\u0002HA!\u0011\u0011JA*\u001b\t\tYE\u0003\u0003\u0002N\u0005=\u0013!C;oG\",7m[3e\u0015\r\t\t&K\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BA+\u0003\u0017\u0012\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0001")
/* loaded from: input_file:akka/persistence/r2dbc/query/EventsByPersistenceIdStage.class */
public final class EventsByPersistenceIdStage extends EventsByStage {
    private final QueryDao dao;
    private final String persistenceId;
    private final long toSeqNr;
    private final Option<FiniteDuration> refreshInterval;
    private final AtomicBoolean completeSwitch;
    private long processedEntries;
    private long currentSeq;
    private long targetSeq;

    public static EventsByPersistenceIdStage apply(QueryDao queryDao, String str, long j, long j2, Option<FiniteDuration> option) {
        return EventsByPersistenceIdStage$.MODULE$.apply(queryDao, str, j, j2, option);
    }

    @Override // akka.persistence.r2dbc.query.EventsByStage
    public Option<FiniteDuration> refreshInterval() {
        return this.refreshInterval;
    }

    @Override // akka.persistence.r2dbc.query.EventsByStage
    public final AtomicBoolean completeSwitch() {
        return this.completeSwitch;
    }

    private long processedEntries() {
        return this.processedEntries;
    }

    private void processedEntries_$eq(long j) {
        this.processedEntries = j;
    }

    private long currentSeq() {
        return this.currentSeq;
    }

    private void currentSeq_$eq(long j) {
        this.currentSeq = j;
    }

    private long targetSeq() {
        return this.targetSeq;
    }

    private void targetSeq_$eq(long j) {
        this.targetSeq = j;
    }

    @Override // akka.persistence.r2dbc.query.EventsByStage
    public void pushedEntry(JournalEntry journalEntry) {
        processedEntries_$eq(processedEntries() + 1);
        currentSeq_$eq(journalEntry.sequenceNr());
    }

    @Override // akka.persistence.r2dbc.query.EventsByStage
    public Source<JournalEntry, NotUsed> fetchEvents() {
        return this.dao.findHighestSeq(this.persistenceId).flatMapConcat(obj -> {
            return $anonfun$fetchEvents$1(this, BoxesRunTime.unboxToLong(obj));
        });
    }

    public static final /* synthetic */ Source $anonfun$fetchEvents$1(EventsByPersistenceIdStage eventsByPersistenceIdStage, long j) {
        long j2;
        if (eventsByPersistenceIdStage.targetSeq() == j) {
            return Source$.MODULE$.empty();
        }
        if (j >= eventsByPersistenceIdStage.toSeqNr) {
            eventsByPersistenceIdStage.completeSwitch().set(true);
            j2 = eventsByPersistenceIdStage.toSeqNr;
        } else {
            j2 = j;
        }
        eventsByPersistenceIdStage.targetSeq_$eq(j2);
        return eventsByPersistenceIdStage.dao.fetchByPersistenceId(eventsByPersistenceIdStage.persistenceId, eventsByPersistenceIdStage.processedEntries() == 0 ? eventsByPersistenceIdStage.currentSeq() : eventsByPersistenceIdStage.currentSeq() + 1, eventsByPersistenceIdStage.targetSeq());
    }

    public EventsByPersistenceIdStage(QueryDao queryDao, String str, long j, long j2, Option<FiniteDuration> option) {
        this.dao = queryDao;
        this.persistenceId = str;
        this.toSeqNr = j2;
        this.refreshInterval = option;
        Predef$.MODULE$.require(queryDao != null, () -> {
            return "the 'dao' must be provided";
        });
        Predef$.MODULE$.require(str != null && StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(str)), () -> {
            return "the 'persistenceId' must be provided";
        });
        Predef$.MODULE$.require(j >= 0, () -> {
            return "the 'fromSeqNr' must be >= 0";
        });
        Predef$.MODULE$.require(j2 >= 0, () -> {
            return "the 'toSeqNr' must be >= 0";
        });
        Predef$.MODULE$.require(j < j2, () -> {
            return "the 'fromSeqNr' must be < the 'toSeqNr'";
        });
        this.completeSwitch = new AtomicBoolean(false);
        this.processedEntries = 0L;
        this.currentSeq = j;
        this.targetSeq = 0L;
    }
}
