package com.daml.platform.store.dao.events;

import akka.NotUsed;
import akka.stream.Attributes$;
import akka.stream.scaladsl.Source;
import com.daml.ledger.api.TraceIdentifiers$;
import com.daml.ledger.api.v1.event.Event;
import com.daml.ledger.api.v1.transaction.Transaction;
import com.daml.ledger.api.v1.transaction_service.GetTransactionsResponse;
import com.daml.ledger.offset.Offset;
import com.daml.logging.ContextualizedLogger;
import com.daml.logging.ContextualizedLogger$;
import com.daml.logging.LoggingContext;
import com.daml.metrics.DatabaseMetrics;
import com.daml.metrics.IndexMetrics$db$;
import com.daml.metrics.Metrics;
import com.daml.metrics.Timed$;
import com.daml.platform.TemplatePartiesFilter;
import com.daml.platform.configuration.TransactionFlatStreamsConfig;
import com.daml.platform.indexer.parallel.BatchN$;
import com.daml.platform.store.backend.EventStorageBackend;
import com.daml.platform.store.backend.common.EventIdSourceForStakeholders;
import com.daml.platform.store.backend.common.EventIdSourceForStakeholders$Consuming$;
import com.daml.platform.store.backend.common.EventIdSourceForStakeholders$Create$;
import com.daml.platform.store.backend.common.EventPayloadSourceForFlatTx;
import com.daml.platform.store.backend.common.EventPayloadSourceForFlatTx$Consuming$;
import com.daml.platform.store.backend.common.EventPayloadSourceForFlatTx$Create$;
import com.daml.platform.store.dao.DbDispatcher;
import com.daml.platform.store.dao.EventProjectionProperties;
import com.daml.platform.store.dao.PaginatingAsyncStream$;
import com.daml.platform.store.dao.events.Raw;
import com.daml.platform.store.utils.ConcurrencyLimiter;
import com.daml.platform.store.utils.QueueBasedConcurrencyLimiter;
import com.daml.platform.store.utils.Telemetry$Transactions$;
import com.daml.tracing.Spans$;
import io.opentelemetry.api.trace.Span;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.BuildFrom$;
import scala.collection.immutable.Vector;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.math.Ordering;
import scala.math.Ordering$Long$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.ChainingOps$;
import scala.util.package$chaining$;

/* compiled from: TransactionsFlatStreamReader.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005Eh\u0001\u0002\u000b\u0016\u0001\tB\u0001\"\u000b\u0001\u0003\u0002\u0003\u0006IA\u000b\u0005\ta\u0001\u0011\t\u0011)A\u0005c!Aq\u0007\u0001B\u0001B\u0003%\u0011\u0007\u0003\u00059\u0001\t\u0005\t\u0015!\u0003:\u0011!i\u0004A!A!\u0002\u0013q\u0004\u0002\u0003\"\u0001\u0005\u0003\u0005\u000b\u0011B\"\t\u0011%\u0003!\u0011!Q\u0001\n)C\u0001\"\u0014\u0001\u0003\u0002\u0003\u0006IA\u0014\u0005\t'\u0002\u0011\t\u0011)A\u0006)\")!\f\u0001C\u00017\"9q\r\u0001b\u0001\n\u0013A\u0007BB8\u0001A\u0003%\u0011\u000eC\u0004q\u0001\t\u0007I\u0011B9\t\r}\u0004\u0001\u0015!\u0003s\u0011%\t\t\u0001\u0001b\u0001\n\u0013\t\u0019\u0001\u0003\u0005\u0002.\u0001\u0001\u000b\u0011BA\u0003\u0011\u001d\ty\u0003\u0001C\u0001\u0003cAq!!*\u0001\t\u0013\t9\u000bC\u0004\u00024\u0002!I!!.\u00039Q\u0013\u0018M\\:bGRLwN\\:GY\u0006$8\u000b\u001e:fC6\u0014V-\u00193fe*\u0011acF\u0001\u0007KZ,g\u000e^:\u000b\u0005aI\u0012a\u00013b_*\u0011!dG\u0001\u0006gR|'/\u001a\u0006\u00039u\t\u0001\u0002\u001d7bi\u001a|'/\u001c\u0006\u0003=}\tA\u0001Z1nY*\t\u0001%A\u0002d_6\u001c\u0001a\u0005\u0002\u0001GA\u0011AeJ\u0007\u0002K)\ta%A\u0003tG\u0006d\u0017-\u0003\u0002)K\t1\u0011I\\=SK\u001a\faaY8oM&<\u0007CA\u0016/\u001b\u0005a#BA\u0017\u001c\u00035\u0019wN\u001c4jOV\u0014\u0018\r^5p]&\u0011q\u0006\f\u0002\u001d)J\fgn]1di&|gN\u00127biN#(/Z1ng\u000e{gNZ5h\u0003Y9Gn\u001c2bY&#\u0017+^3sS\u0016\u001cH*[7ji\u0016\u0014\bC\u0001\u001a6\u001b\u0005\u0019$B\u0001\u001b\u001a\u0003\u0015)H/\u001b7t\u0013\t14G\u0001\nD_:\u001cWO\u001d:f]\u000eLH*[7ji\u0016\u0014\u0018aG4m_\n\fG\u000eU1zY>\fG-U;fe&,7\u000fT5nSR,'/\u0001\u0007eE\u0012K7\u000f]1uG\",'\u000f\u0005\u0002;w5\tq#\u0003\u0002=/\taAI\u0019#jgB\fGo\u00195fe\u0006q\u0011/^3ss:{g\u000e\u0015:v]\u0016$\u0007CA A\u001b\u0005)\u0012BA!\u0016\u00059\tV/\u001a:z\u001d>t\u0007K];oK\u0012\f1#\u001a<f]R\u001cFo\u001c:bO\u0016\u0014\u0015mY6f]\u0012\u0004\"\u0001R$\u000e\u0003\u0015S!AR\r\u0002\u000f\t\f7m[3oI&\u0011\u0001*\u0012\u0002\u0014\u000bZ,g\u000e^*u_J\fw-\u001a\"bG.,g\u000eZ\u0001\u0013Y\u001a4\u0016\r\\;f)J\fgn\u001d7bi&|g\u000e\u0005\u0002@\u0017&\u0011A*\u0006\u0002\u0013\u0019\u001a4\u0016\r\\;f)J\fgn\u001d7bi&|g.A\u0004nKR\u0014\u0018nY:\u0011\u0005=\u000bV\"\u0001)\u000b\u00055k\u0012B\u0001*Q\u0005\u001diU\r\u001e:jGN\f\u0001#\u001a=fGV$\u0018n\u001c8D_:$X\r\u001f;\u0011\u0005UCV\"\u0001,\u000b\u0005]+\u0013AC2p]\u000e,(O]3oi&\u0011\u0011L\u0016\u0002\u0011\u000bb,7-\u001e;j_:\u001cuN\u001c;fqR\fa\u0001P5oSRtD#\u0003/`A\u0006\u00147\rZ3g)\tif\f\u0005\u0002@\u0001!)1K\u0003a\u0002)\")\u0011F\u0003a\u0001U!)\u0001G\u0003a\u0001c!)qG\u0003a\u0001c!)\u0001H\u0003a\u0001s!)QH\u0003a\u0001}!)!I\u0003a\u0001\u0007\")\u0011J\u0003a\u0001\u0015\")QJ\u0003a\u0001\u001d\u00061An\\4hKJ,\u0012!\u001b\t\u0003U6l\u0011a\u001b\u0006\u0003Yv\tq\u0001\\8hO&tw-\u0003\u0002oW\n!2i\u001c8uKb$X/\u00197ju\u0016$Gj\\4hKJ\fq\u0001\\8hO\u0016\u0014\b%A\u0005eE6+GO]5dgV\t!O\u0004\u0002tw:\u0011A\u000f\u001f\b\u0003k^t!A\u001e\u0005\u000e\u0003\u0001I!AH)\n\u0005eT\u0018!B5oI\u0016D(B\u0001\u0010R\u0013\taX0\u0001\u0002eE&\u0011a\u0010\u0015\u0002\r\u0013:$W\r_'fiJL7m]\u0001\u000bI\nlU\r\u001e:jGN\u0004\u0013\u0001G8sI\u0016\u0014()_*fcV,g\u000e^5bY\u00163XM\u001c;JIV\u0011\u0011Q\u0001\t\u0007\u0003\u000f\ti!!\u0005\u000e\u0005\u0005%!bAA\u0006K\u0005!Q.\u0019;i\u0013\u0011\ty!!\u0003\u0003\u0011=\u0013H-\u001a:j]\u001e\u0004b!a\u0005\u0002\u001a\u0005}ab\u0001#\u0002\u0016%\u0019\u0011qC#\u0002'\u00153XM\u001c;Ti>\u0014\u0018mZ3CC\u000e\\WM\u001c3\n\t\u0005m\u0011Q\u0004\u0002\u0006\u000b:$(/\u001f\u0006\u0004\u0003/)\u0005\u0003BA\u0011\u0003Oq1aPA\u0012\u0013\r\t)#F\u0001\u0004%\u0006<\u0018\u0002BA\u0015\u0003W\u0011\u0011B\u00127bi\u00163XM\u001c;\u000b\u0007\u0005\u0015R#A\rpe\u0012,'OQ=TKF,XM\u001c;jC2,e/\u001a8u\u0013\u0012\u0004\u0013AF:ue\u0016\fWN\u00127biR\u0013\u0018M\\:bGRLwN\\:\u0015\u0011\u0005M\u0012QQAH\u00037#B!!\u000e\u0002|AA\u0011qGA#\u0003\u0013\n\u0019(\u0004\u0002\u0002:)!\u00111HA\u001f\u0003!\u00198-\u00197bINd'\u0002BA \u0003\u0003\naa\u001d;sK\u0006l'BAA\"\u0003\u0011\t7n[1\n\t\u0005\u001d\u0013\u0011\b\u0002\u0007'>,(oY3\u0011\u000f\u0011\nY%a\u0014\u0002`%\u0019\u0011QJ\u0013\u0003\rQ+\b\u000f\\33!\u0011\t\t&a\u0017\u000e\u0005\u0005M#\u0002BA+\u0003/\naa\u001c4gg\u0016$(bAA-;\u00051A.\u001a3hKJLA!!\u0018\u0002T\t1qJ\u001a4tKR\u0004B!!\u0019\u0002p5\u0011\u00111\r\u0006\u0005\u0003K\n9'A\nue\u0006t7/Y2uS>twl]3sm&\u001cWM\u0003\u0003\u0002j\u0005-\u0014A\u0001<2\u0015\u0011\ti'a\u0016\u0002\u0007\u0005\u0004\u0018.\u0003\u0003\u0002r\u0005\r$aF$fiR\u0013\u0018M\\:bGRLwN\\:SKN\u0004xN\\:f!\u0011\t)(a\u001e\u000e\u0005\u0005\u0005\u0013\u0002BA=\u0003\u0003\u0012qAT8u+N,G\rC\u0004\u0002~E\u0001\u001d!a \u0002\u001d1|wmZ5oO\u000e{g\u000e^3yiB\u0019!.!!\n\u0007\u0005\r5N\u0001\bM_\u001e<\u0017N\\4D_:$X\r\u001f;\t\u000f\u0005\u001d\u0015\u00031\u0001\u0002\n\u0006Q\u0011/^3ssJ\u000bgnZ3\u0011\u0007}\nY)C\u0002\u0002\u000eV\u00111\"\u0012<f]R\u001c(+\u00198hK\"9\u0011\u0011S\tA\u0002\u0005M\u0015\u0001\u00064jYR,'/\u001b8h\u0007>t7\u000f\u001e:bS:$8\u000f\u0005\u0003\u0002\u0016\u0006]U\"A\u000e\n\u0007\u0005e5DA\u000bUK6\u0004H.\u0019;f!\u0006\u0014H/[3t\r&dG/\u001a:\t\u000f\u0005u\u0015\u00031\u0001\u0002 \u0006IRM^3oiB\u0013xN[3di&|g\u000e\u0015:pa\u0016\u0014H/[3t!\rQ\u0014\u0011U\u0005\u0004\u0003G;\"!G#wK:$\bK]8kK\u000e$\u0018n\u001c8Qe>\u0004XM\u001d;jKN\f\u0001\u0004Z8TiJ,\u0017-\u001c$mCR$&/\u00198tC\u000e$\u0018n\u001c8t)!\tI+!,\u00020\u0006EF\u0003BA\u001b\u0003WCq!! \u0013\u0001\b\ty\bC\u0004\u0002\bJ\u0001\r!!#\t\u000f\u0005E%\u00031\u0001\u0002\u0014\"9\u0011Q\u0014\nA\u0002\u0005}\u0015a\u00053fg\u0016\u0014\u0018.\u00197ju\u0016deMV1mk\u0016\u001cHCBA\\\u0003S\fy\u000f\u0006\u0003\u0002:\u0006\u0015\b#B+\u0002<\u0006}\u0016bAA_-\n1a)\u001e;ve\u0016\u0004b!!1\u0002R\u0006]g\u0002BAb\u0003\u001btA!!2\u0002L6\u0011\u0011q\u0019\u0006\u0004\u0003\u0013\f\u0013A\u0002\u001fs_>$h(C\u0001'\u0013\r\ty-J\u0001\ba\u0006\u001c7.Y4f\u0013\u0011\t\u0019.!6\u0003\rY+7\r^8s\u0015\r\ty-\n\t\u0007\u0003'\tI\"!7\u0011\t\u0005m\u0017\u0011]\u0007\u0003\u0003;TA!a8\u0002h\u0005)QM^3oi&!\u00111]Ao\u0005\u0015)e/\u001a8u\u0011\u001d\t9o\u0005a\u0002\u0003\u007f\n!\u0001\\2\t\u000f\u0005-8\u00031\u0001\u0002n\u0006I!/Y<Fm\u0016tGo\u001d\t\u0007\u0003\u0003\f\t.!\u0005\t\u000f\u0005u5\u00031\u0001\u0002 \u0002")
/* loaded from: input_file:com/daml/platform/store/dao/events/TransactionsFlatStreamReader.class */
public class TransactionsFlatStreamReader {
    private final TransactionFlatStreamsConfig config;
    private final ConcurrencyLimiter globalIdQueriesLimiter;
    private final ConcurrencyLimiter globalPayloadQueriesLimiter;
    private final DbDispatcher dbDispatcher;
    private final QueryNonPruned queryNonPruned;
    private final EventStorageBackend eventStorageBackend;
    private final LfValueTranslation lfValueTranslation;
    private final ExecutionContext executionContext;
    private final IndexMetrics$db$ dbMetrics;
    private final ContextualizedLogger logger = ContextualizedLogger$.MODULE$.get(getClass());
    private final Ordering<EventStorageBackend.Entry<Raw.FlatEvent>> orderBySequentialEventId = package$.MODULE$.Ordering().by(entry -> {
        return BoxesRunTime.boxToLong(entry.eventSequentialId());
    }, Ordering$Long$.MODULE$);

    private ContextualizedLogger logger() {
        return this.logger;
    }

    private IndexMetrics$db$ dbMetrics() {
        return this.dbMetrics;
    }

    private Ordering<EventStorageBackend.Entry<Raw.FlatEvent>> orderBySequentialEventId() {
        return this.orderBySequentialEventId;
    }

    public Source<Tuple2<Offset, GetTransactionsResponse>, NotUsed> streamFlatTransactions(EventsRange eventsRange, TemplatePartiesFilter templatePartiesFilter, EventProjectionProperties eventProjectionProperties, LoggingContext loggingContext) {
        Span createSpan = Telemetry$Transactions$.MODULE$.createSpan(eventsRange.startExclusiveOffset(), eventsRange.endInclusiveOffset(), "com.daml.platform.store.dao.events.TransactionsFlatStreamReader.streamFlatTransactions");
        logger().debug().apply(() -> {
            return new StringBuilder(30).append("streamFlatTransactions(").append(eventsRange.startExclusiveOffset()).append(", ").append(eventsRange.endInclusiveOffset()).append(", ").append(templatePartiesFilter).append(", ").append(eventProjectionProperties).append(")").toString();
        }, loggingContext);
        return doStreamFlatTransactions(eventsRange, templatePartiesFilter, eventProjectionProperties, loggingContext).wireTap(tuple2 -> {
            $anonfun$streamFlatTransactions$2(createSpan, tuple2);
            return BoxedUnit.UNIT;
        }).watchTermination((notUsed, future) -> {
            return (NotUsed) TransactionsReader$.MODULE$.endSpanOnTermination(createSpan, notUsed, future, this.executionContext);
        });
    }

    private Source<Tuple2<Offset, GetTransactionsResponse>, NotUsed> doStreamFlatTransactions(EventsRange eventsRange, TemplatePartiesFilter templatePartiesFilter, EventProjectionProperties eventProjectionProperties, LoggingContext loggingContext) {
        QueueBasedConcurrencyLimiter queueBasedConcurrencyLimiter = new QueueBasedConcurrencyLimiter(this.config.maxParallelIdCreateQueries(), this.executionContext);
        QueueBasedConcurrencyLimiter queueBasedConcurrencyLimiter2 = new QueueBasedConcurrencyLimiter(this.config.maxParallelIdConsumingQueries(), this.executionContext);
        QueueBasedConcurrencyLimiter queueBasedConcurrencyLimiter3 = new QueueBasedConcurrencyLimiter(this.config.maxParallelPayloadQueries(), this.executionContext);
        Vector vector = FilterUtils$.MODULE$.decomposeFilters(templatePartiesFilter).toVector();
        IdPageSizing calculateFrom = IdPageSizing$.MODULE$.calculateFrom(this.config.maxIdsPerIdPage(), this.config.maxWorkingMemoryInBytesForIdPages() / 2, vector.size(), this.config.maxPagesPerIdPagesBuffer(), loggingContext);
        Source fetchIds$1 = fetchIds$1(EventIdSourceForStakeholders$Create$.MODULE$, queueBasedConcurrencyLimiter, this.config.maxParallelPayloadCreateQueries() + 1, dbMetrics().flatTxStream().fetchEventCreateIdsStakeholder(), vector, calculateFrom, eventsRange, loggingContext);
        Source fetchIds$12 = fetchIds$1(EventIdSourceForStakeholders$Consuming$.MODULE$, queueBasedConcurrencyLimiter2, this.config.maxParallelPayloadConsumingQueries() + 1, dbMetrics().flatTxStream().fetchEventConsumingIdsStakeholder(), vector, calculateFrom, eventsRange, loggingContext);
        return TransactionsReader$.MODULE$.groupContiguous(fetchPayloads$1(fetchIds$12, EventPayloadSourceForFlatTx$Consuming$.MODULE$, this.config.maxParallelPayloadConsumingQueries(), dbMetrics().flatTxStream().fetchEventConsumingPayloads(), queueBasedConcurrencyLimiter3, templatePartiesFilter, eventsRange, loggingContext).mergeSorted(fetchPayloads$1(fetchIds$1, EventPayloadSourceForFlatTx$Create$.MODULE$, this.config.maxParallelPayloadCreateQueries(), dbMetrics().flatTxStream().fetchEventCreatePayloads(), queueBasedConcurrencyLimiter3, templatePartiesFilter, eventsRange, loggingContext), orderBySequentialEventId()), entry -> {
            return entry.transactionId();
        }).mapAsync(this.config.transactionsProcessingParallelism(), vector2 -> {
            return this.deserializeLfValues(vector2, eventProjectionProperties, loggingContext);
        }).mapConcat(vector3 -> {
            return EventsTable$TransactionConversions$.MODULE$.toGetTransactionsResponse(vector3).map(getTransactionsResponse -> {
                return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(TransactionsReader$.MODULE$.offsetFor(getTransactionsResponse)), getTransactionsResponse);
            });
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Future<Vector<EventStorageBackend.Entry<Event>>> deserializeLfValues(Vector<EventStorageBackend.Entry<Raw.FlatEvent>> vector, EventProjectionProperties eventProjectionProperties, LoggingContext loggingContext) {
        Function0 function0 = () -> {
            return Future$.MODULE$.traverse(vector, entry -> {
                return TransactionsReader$.MODULE$.deserializeEntry(eventProjectionProperties, this.lfValueTranslation, entry, loggingContext, this.executionContext);
            }, BuildFrom$.MODULE$.buildFromIterableOps(), this.executionContext);
        };
        return Timed$.MODULE$.future(dbMetrics().flatTxStream().translationTimer(), function0);
    }

    public static final /* synthetic */ void $anonfun$streamFlatTransactions$3(Span span, Transaction transaction) {
        Spans$.MODULE$.addEventToSpan(new com.daml.tracing.Event("transaction", TraceIdentifiers$.MODULE$.fromTransaction(transaction)), span);
    }

    public static final /* synthetic */ void $anonfun$streamFlatTransactions$2(Span span, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        ((GetTransactionsResponse) tuple2._2()).transactions().foreach(transaction -> {
            $anonfun$streamFlatTransactions$3(span, transaction);
            return BoxedUnit.UNIT;
        });
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    private final Source fetchIds$1(EventIdSourceForStakeholders eventIdSourceForStakeholders, QueueBasedConcurrencyLimiter queueBasedConcurrencyLimiter, int i, DatabaseMetrics databaseMetrics, Vector vector, IdPageSizing idPageSizing, EventsRange eventsRange, LoggingContext loggingContext) {
        return ((Source) ChainingOps$.MODULE$.pipe$extension(package$chaining$.MODULE$.scalaUtilChainingOps(vector.map(decomposedFilter -> {
            return PaginatingAsyncStream$.MODULE$.streamIdsFromSeekPagination(idPageSizing, this.config.maxPagesPerIdPagesBuffer(), eventsRange.startExclusiveEventSeqId(), idPaginationState -> {
                return queueBasedConcurrencyLimiter.execute(() -> {
                    return this.globalIdQueriesLimiter.execute(() -> {
                        return this.dbDispatcher.executeSql(databaseMetrics, connection -> {
                            return this.eventStorageBackend.transactionStreamingQueries().fetchEventIdsForStakeholder(eventIdSourceForStakeholders, decomposedFilter.party(), decomposedFilter.templateId(), idPaginationState.fromIdExclusive(), eventsRange.endInclusiveEventSeqId(), idPaginationState.pageSize(), connection);
                        }, loggingContext);
                    });
                });
            });
        })), vector2 -> {
            return EventIdsUtils$.MODULE$.sortAndDeduplicateIds(vector2);
        })).via(BatchN$.MODULE$.apply(this.config.maxPayloadsPerPayloadsPage(), i));
    }

    private final Source fetchPayloads$1(Source source, EventPayloadSourceForFlatTx eventPayloadSourceForFlatTx, int i, DatabaseMetrics databaseMetrics, QueueBasedConcurrencyLimiter queueBasedConcurrencyLimiter, TemplatePartiesFilter templatePartiesFilter, EventsRange eventsRange, LoggingContext loggingContext) {
        int largestSmallerOrEqualPowerOfTwo = Utils$.MODULE$.largestSmallerOrEqualPowerOfTwo(i);
        return source.async().addAttributes(Attributes$.MODULE$.inputBuffer(largestSmallerOrEqualPowerOfTwo, largestSmallerOrEqualPowerOfTwo)).mapAsync(i, arrayBuffer -> {
            return queueBasedConcurrencyLimiter.execute(() -> {
                return this.globalPayloadQueriesLimiter.execute(() -> {
                    return this.dbDispatcher.executeSql(databaseMetrics, connection -> {
                        return (Vector) this.queryNonPruned.executeSql(() -> {
                            return this.eventStorageBackend.transactionStreamingQueries().fetchEventPayloadsFlat(eventPayloadSourceForFlatTx, arrayBuffer, templatePartiesFilter.allFilterParties(), connection);
                        }, eventsRange.startExclusiveOffset(), offset -> {
                            return new StringBuilder(54).append("Transactions request from ").append(eventsRange.startExclusiveOffset().toHexString()).append(" to ").append(eventsRange.endInclusiveOffset().toHexString()).append(" precedes pruned offset ").append(offset.toHexString()).toString();
                        }, connection, loggingContext);
                    }, loggingContext);
                });
            });
        }).mapConcat(vector -> {
            return (Vector) Predef$.MODULE$.identity(vector);
        });
    }

    public TransactionsFlatStreamReader(TransactionFlatStreamsConfig transactionFlatStreamsConfig, ConcurrencyLimiter concurrencyLimiter, ConcurrencyLimiter concurrencyLimiter2, DbDispatcher dbDispatcher, QueryNonPruned queryNonPruned, EventStorageBackend eventStorageBackend, LfValueTranslation lfValueTranslation, Metrics metrics, ExecutionContext executionContext) {
        this.config = transactionFlatStreamsConfig;
        this.globalIdQueriesLimiter = concurrencyLimiter;
        this.globalPayloadQueriesLimiter = concurrencyLimiter2;
        this.dbDispatcher = dbDispatcher;
        this.queryNonPruned = queryNonPruned;
        this.eventStorageBackend = eventStorageBackend;
        this.lfValueTranslation = lfValueTranslation;
        this.executionContext = executionContext;
        this.dbMetrics = metrics.daml().index().db();
    }
}
