package com.github.jukkakarvanen.kafka.streams.test;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Properties;
import java.util.Queue;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.test.ConsumerRecordFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/jukkakarvanen/kafka/streams/test/TopologyTestDriver.class */
public class TopologyTestDriver extends org.apache.kafka.streams.TopologyTestDriver {
    private static final Logger log = LoggerFactory.getLogger(TopologyTestDriver.class);
    private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic;

    public TopologyTestDriver(Topology topology, Properties properties) {
        super(topology, properties);
        this.outputRecordsByTopic = new HashMap();
    }

    @Deprecated
    public TopologyTestDriver(Topology topology, Properties properties, long j) {
        super(topology, properties, j);
        this.outputRecordsByTopic = new HashMap();
    }

    public TopologyTestDriver(Topology topology, Properties properties, Instant instant) {
        this(topology, properties, instant == null ? System.currentTimeMillis() : instant.toEpochMilli());
    }

    protected final Queue<ProducerRecord<byte[], byte[]>> getRecordsQueue(String str) {
        Queue<ProducerRecord<byte[], byte[]>> computeIfAbsent = this.outputRecordsByTopic.computeIfAbsent(str, str2 -> {
            return new LinkedList();
        });
        while (true) {
            ProducerRecord<byte[], byte[]> readOutput = readOutput(str);
            if (readOutput == null) {
                return computeIfAbsent;
            }
            computeIfAbsent.add(readOutput);
        }
    }

    public void advanceWallClockTime(Duration duration) {
        Objects.requireNonNull(duration, "advance cannot be null");
        advanceWallClockTime(duration.toMillis());
    }

    public final <K, V> TestInputTopic<K, V> createInputTopic(String str, Serializer<K> serializer, Serializer<V> serializer2) {
        return new TestInputTopic<>(this, str, serializer, serializer2, Instant.now(), Duration.ZERO);
    }

    public final <K, V> TestInputTopic<K, V> createInputTopic(String str, Serializer<K> serializer, Serializer<V> serializer2, Instant instant, Duration duration) {
        return new TestInputTopic<>(this, str, serializer, serializer2, instant, duration);
    }

    public final <K, V> TestOutputTopic<K, V> createOutputTopic(String str, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        return new TestOutputTopic<>(this, str, deserializer, deserializer2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> TestRecord<K, V> readRecord(String str, Deserializer<K> deserializer, Deserializer<V> deserializer2) {
        Queue<ProducerRecord<byte[], byte[]>> recordsQueue = getRecordsQueue(str);
        if (recordsQueue == null) {
            throw new NoSuchElementException("Uninitialized topic: " + str);
        }
        ProducerRecord<byte[], byte[]> poll = recordsQueue.poll();
        if (poll == null) {
            throw new NoSuchElementException("Empty topic: " + str);
        }
        return new TestRecord<>(deserializer.deserialize(poll.topic(), (byte[]) poll.key()), deserializer2.deserialize(poll.topic(), (byte[]) poll.value()), poll.headers(), poll.timestamp());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <K, V> void pipeRecord(String str, TestRecord<K, V> testRecord, Serializer<K> serializer, Serializer<V> serializer2, Instant instant) {
        pipeInput(new ConsumerRecordFactory(str, serializer, serializer2).create(testRecord.key(), testRecord.value(), testRecord.headers(), instant != null ? instant.toEpochMilli() : testRecord.timestamp().longValue()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final long getQueueSize(String str) {
        if (getRecordsQueue(str) == null) {
            return 0L;
        }
        return r0.size();
    }
}
