package akka.persistence.r2dbc.query;

import akka.NotUsed;
import akka.persistence.r2dbc.journal.JournalEntry;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import java.util.concurrent.atomic.AtomicBoolean;
import scala.Option;
import scala.Predef$;
import scala.collection.StringOps$;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: EventsByTagStage.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005=sAB\r\u001b\u0011\u0003Q\"E\u0002\u0004%5!\u0005!$\n\u0005\u0006Y\u0005!\tA\f\u0005\u0006_\u0005!\t\u0001\r\u0005\n\u0003o\t\u0011\u0013!C\u0001\u0003s1Q\u0001\n\u000e\u00035IB\u0001BN\u0003\u0003\u0002\u0003\u0006Ia\u000e\u0005\tu\u0015\u0011\t\u0011)A\u0005w!Aa)\u0002B\u0001B\u0003%q\t\u0003\u0005K\u000b\t\u0015\r\u0011\"\u0001L\u0011!9VA!A!\u0002\u0013a\u0005\"\u0002\u0017\u0006\t\u0013A\u0006bB/\u0006\u0001\u0004%IA\u0018\u0005\b?\u0016\u0001\r\u0011\"\u0003a\u0011\u00191W\u0001)Q\u0005\u000f\"9q-\u0002a\u0001\n\u0013q\u0006b\u00025\u0006\u0001\u0004%I!\u001b\u0005\u0007W\u0016\u0001\u000b\u0015B$\t\u000f1,\u0001\u0019!C\u0005=\"9Q.\u0002a\u0001\n\u0013q\u0007B\u00029\u0006A\u0003&q\tC\u0004r\u000b\t\u0007IQ\u0003:\t\ry,\u0001\u0015!\u0004t\u0011\u0019yX\u0001\"\u0015\u0002\u0002!9\u00111C\u0003\u0005R\u0005U\u0011\u0001E#wK:$8OQ=UC\u001e\u001cF/Y4f\u0015\tYB$A\u0003rk\u0016\u0014\u0018P\u0003\u0002\u001e=\u0005)!O\r3cG*\u0011q\u0004I\u0001\fa\u0016\u00148/[:uK:\u001cWMC\u0001\"\u0003\u0011\t7n[1\u0011\u0005\r\nQ\"\u0001\u000e\u0003!\u00153XM\u001c;t\u0005f$\u0016mZ*uC\u001e,7CA\u0001'!\t9#&D\u0001)\u0015\u0005I\u0013!B:dC2\f\u0017BA\u0016)\u0005\u0019\te.\u001f*fM\u00061A(\u001b8jiz\u001a\u0001\u0001F\u0001#\u0003\u0015\t\u0007\u000f\u001d7z)%\t\u0014qFA\u0019\u0003g\t)\u0004\u0005\u0002$\u000bM\u0011Qa\r\t\u0003GQJ!!\u000e\u000e\u0003\u001b\u00153XM\u001c;t\u0005f\u001cF/Y4f\u0003\r!\u0017m\u001c\t\u0003GaJ!!\u000f\u000e\u0003\u0011E+XM]=EC>\f1\u0001^1h!\ta4I\u0004\u0002>\u0003B\u0011a\bK\u0007\u0002\u007f)\u0011\u0001)L\u0001\u0007yI|w\u000e\u001e \n\u0005\tC\u0013A\u0002)sK\u0012,g-\u0003\u0002E\u000b\n11\u000b\u001e:j]\u001eT!A\u0011\u0015\u0002\r=4gm]3u!\t9\u0003*\u0003\u0002JQ\t!Aj\u001c8h\u0003=\u0011XM\u001a:fg\"Le\u000e^3sm\u0006dW#\u0001'\u0011\u0007\u001dju*\u0003\u0002OQ\t1q\n\u001d;j_:\u0004\"\u0001U+\u000e\u0003ES!AU*\u0002\u0011\u0011,(/\u0019;j_:T!\u0001\u0016\u0015\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0002W#\nqa)\u001b8ji\u0016$UO]1uS>t\u0017\u0001\u0005:fMJ,7\u000f[%oi\u0016\u0014h/\u00197!)\u0015\t\u0014LW.]\u0011\u001514\u00021\u00018\u0011\u0015Q4\u00021\u0001<\u0011\u001515\u00021\u0001H\u0011\u0015Q5\u00021\u0001M\u0003A\u0001(o\\2fgN,G-\u00128ue&,7/F\u0001H\u0003Q\u0001(o\\2fgN,G-\u00128ue&,7o\u0018\u0013fcR\u0011\u0011\r\u001a\t\u0003O\tL!a\u0019\u0015\u0003\tUs\u0017\u000e\u001e\u0005\bK6\t\t\u00111\u0001H\u0003\rAH%M\u0001\u0012aJ|7-Z:tK\u0012,e\u000e\u001e:jKN\u0004\u0013\u0001D2veJ,g\u000e^%oI\u0016D\u0018\u0001E2veJ,g\u000e^%oI\u0016Dx\fJ3r)\t\t'\u000eC\u0004f!\u0005\u0005\t\u0019A$\u0002\u001b\r,(O]3oi&sG-\u001a=!\u0003-!\u0018M]4fi&sG-\u001a=\u0002\u001fQ\f'oZ3u\u0013:$W\r_0%KF$\"!Y8\t\u000f\u0015\u001c\u0012\u0011!a\u0001\u000f\u0006aA/\u0019:hKRLe\u000eZ3yA\u0005q1m\\7qY\u0016$XmU<ji\u000eDW#A:\u0011\u0005QdX\"A;\u000b\u0005Y<\u0018AB1u_6L7M\u0003\u0002Uq*\u0011\u0011P_\u0001\u0005kRLGNC\u0001|\u0003\u0011Q\u0017M^1\n\u0005u,(!D!u_6L7MQ8pY\u0016\fg.A\bd_6\u0004H.\u001a;f'^LGo\u00195!\u0003-\u0001Xo\u001d5fI\u0016sGO]=\u0015\u0007\u0005\f\u0019\u0001C\u0004\u0002\u0006]\u0001\r!a\u0002\u0002\u000b\u0015tGO]=\u0011\t\u0005%\u0011qB\u0007\u0003\u0003\u0017Q1!!\u0004\u001d\u0003\u001dQw.\u001e:oC2LA!!\u0005\u0002\f\ta!j\\;s]\u0006dWI\u001c;ss\u0006Ya-\u001a;dQ\u00163XM\u001c;t)\t\t9\u0002\u0005\u0005\u0002\u001a\u0005\r\u0012qAA\u0014\u001b\t\tYB\u0003\u0003\u0002\u001e\u0005}\u0011\u0001C:dC2\fGm\u001d7\u000b\u0007\u0005\u0005\u0002%\u0001\u0004tiJ,\u0017-\\\u0005\u0005\u0003K\tYB\u0001\u0004T_V\u00148-\u001a\t\u0005\u0003S\tY#D\u0001!\u0013\r\ti\u0003\t\u0002\b\u001d>$Xk]3e\u0011\u001514\u00011\u00018\u0011\u0015Q4\u00011\u0001<\u0011\u001515\u00011\u0001H\u0011\u001dQ5\u0001%AA\u00021\u000bq\"\u00199qYf$C-\u001a4bk2$H\u0005N\u000b\u0003\u0003wQ3\u0001TA\u001fW\t\ty\u0004\u0005\u0003\u0002B\u0005-SBAA\"\u0015\u0011\t)%a\u0012\u0002\u0013Ut7\r[3dW\u0016$'bAA%Q\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u00055\u00131\t\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0007")
/* loaded from: input_file:akka/persistence/r2dbc/query/EventsByTagStage.class */
public final class EventsByTagStage extends EventsByStage {
    private final QueryDao dao;
    private final String tag;
    private final Option<FiniteDuration> refreshInterval;
    private long processedEntries;
    private long currentIndex;
    private long targetIndex;
    private final AtomicBoolean completeSwitch;

    public static EventsByTagStage apply(QueryDao queryDao, String str, long j, Option<FiniteDuration> option) {
        return EventsByTagStage$.MODULE$.apply(queryDao, str, j, option);
    }

    @Override // akka.persistence.r2dbc.query.EventsByStage
    public Option<FiniteDuration> refreshInterval() {
        return this.refreshInterval;
    }

    private long processedEntries() {
        return this.processedEntries;
    }

    private void processedEntries_$eq(long j) {
        this.processedEntries = j;
    }

    private long currentIndex() {
        return this.currentIndex;
    }

    private void currentIndex_$eq(long j) {
        this.currentIndex = j;
    }

    private long targetIndex() {
        return this.targetIndex;
    }

    private void targetIndex_$eq(long j) {
        this.targetIndex = j;
    }

    @Override // akka.persistence.r2dbc.query.EventsByStage
    public final AtomicBoolean completeSwitch() {
        return this.completeSwitch;
    }

    @Override // akka.persistence.r2dbc.query.EventsByStage
    public void pushedEntry(JournalEntry journalEntry) {
        processedEntries_$eq(processedEntries() + 1);
        currentIndex_$eq(journalEntry.id());
    }

    @Override // akka.persistence.r2dbc.query.EventsByStage
    public Source<JournalEntry, NotUsed> fetchEvents() {
        return this.dao.findHighestIndex(this.tag).flatMapConcat(obj -> {
            return $anonfun$fetchEvents$1(this, BoxesRunTime.unboxToLong(obj));
        });
    }

    public static final /* synthetic */ Source $anonfun$fetchEvents$1(EventsByTagStage eventsByTagStage, long j) {
        if (eventsByTagStage.targetIndex() == j) {
            return Source$.MODULE$.empty();
        }
        eventsByTagStage.targetIndex_$eq(j);
        return eventsByTagStage.dao.fetchByTag(eventsByTagStage.tag, eventsByTagStage.processedEntries() == 0 ? eventsByTagStage.currentIndex() : eventsByTagStage.currentIndex() + 1, eventsByTagStage.targetIndex());
    }

    public EventsByTagStage(QueryDao queryDao, String str, long j, Option<FiniteDuration> option) {
        this.dao = queryDao;
        this.tag = str;
        this.refreshInterval = option;
        Predef$.MODULE$.require(queryDao != null, () -> {
            return "the 'dao' must be provided";
        });
        Predef$.MODULE$.require(str != null && StringOps$.MODULE$.nonEmpty$extension(Predef$.MODULE$.augmentString(str)), () -> {
            return "the 'tag' must be provided";
        });
        Predef$.MODULE$.require(j >= 0, () -> {
            return "the 'offset' must be >= 0";
        });
        this.processedEntries = 0L;
        this.currentIndex = j;
        this.targetIndex = 0L;
        this.completeSwitch = new AtomicBoolean();
    }
}
