/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.mongodb.core.messaging;

import com.mongodb.client.MongoCursor;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.messaging.LazyMappingDelegatingMessage;
import org.springframework.data.mongodb.core.messaging.Message;
import org.springframework.data.mongodb.core.messaging.SimpleMessage;
import org.springframework.data.mongodb.core.messaging.SubscriptionRequest;
import org.springframework.data.mongodb.core.messaging.Task;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ErrorHandler;

abstract class CursorReadingTask<T, R>
implements Task {
    private final Object lifecycleMonitor = new Object();
    private final MongoTemplate template;
    private final SubscriptionRequest<T, R, SubscriptionRequest.RequestOptions> request;
    private final Class<R> targetType;
    private final ErrorHandler errorHandler;
    private final CountDownLatch awaitStart = new CountDownLatch(1);
    private Task.State state = Task.State.CREATED;
    private MongoCursor<T> cursor;

    CursorReadingTask(MongoTemplate template, SubscriptionRequest<?, ? super T, ? extends SubscriptionRequest.RequestOptions> request, Class<R> targetType, ErrorHandler errorHandler) {
        this.template = template;
        this.request = request;
        this.targetType = targetType;
        this.errorHandler = errorHandler;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            this.start();
            while (this.isRunning()) {
                try {
                    Object next = this.execute(this::getNext);
                    if (next != null) {
                        this.emitMessage(this.createMessage(next, this.targetType, this.request.getRequestOptions()));
                        continue;
                    }
                    Thread.sleep(10L);
                }
                catch (InterruptedException e) {
                    Object object = this.lifecycleMonitor;
                    synchronized (object) {
                        this.state = Task.State.CANCELLED;
                    }
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        catch (RuntimeException e) {
            Object object = this.lifecycleMonitor;
            synchronized (object) {
                this.state = Task.State.CANCELLED;
            }
            this.errorHandler.handleError(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void start() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            if (!Task.State.RUNNING.equals((Object)this.state)) {
                this.state = Task.State.STARTING;
            }
        }
        do {
            boolean valid = false;
            Object object2 = this.lifecycleMonitor;
            synchronized (object2) {
                if (Task.State.STARTING.equals((Object)this.state)) {
                    MongoCursor cursor = this.execute(() -> this.initCursor(this.template, this.request.getRequestOptions(), this.targetType));
                    valid = CursorReadingTask.isValidCursor(cursor);
                    if (valid) {
                        this.cursor = cursor;
                        this.state = Task.State.RUNNING;
                    } else if (cursor != null) {
                        cursor.close();
                    }
                }
            }
            if (valid) continue;
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                Object object3 = this.lifecycleMonitor;
                synchronized (object3) {
                    this.state = Task.State.CANCELLED;
                }
                Thread.currentThread().interrupt();
            }
        } while (Task.State.STARTING.equals((Object)this.getState()));
        if (this.awaitStart.getCount() == 1L) {
            this.awaitStart.countDown();
        }
    }

    protected abstract MongoCursor<T> initCursor(MongoTemplate var1, SubscriptionRequest.RequestOptions var2, Class<?> var3);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cancel() throws DataAccessResourceFailureException {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            if (Task.State.RUNNING.equals((Object)this.state) || Task.State.STARTING.equals((Object)this.state)) {
                this.state = Task.State.CANCELLED;
                if (this.cursor != null) {
                    this.cursor.close();
                }
            }
        }
    }

    @Override
    public boolean isLongLived() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Task.State getState() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            return this.state;
        }
    }

    @Override
    public boolean awaitStart(Duration timeout) throws InterruptedException {
        Assert.notNull((Object)timeout, "Timeout must not be null");
        Assert.isTrue(!timeout.isNegative(), "Timeout must not be negative");
        return this.awaitStart.await(timeout.toNanos(), TimeUnit.NANOSECONDS);
    }

    protected Message<T, R> createMessage(T source, Class<R> targetType, SubscriptionRequest.RequestOptions options) {
        SimpleMessage<T, T> message = new SimpleMessage<T, T>(source, source, Message.MessageProperties.builder().databaseName(this.template.getDb().getName()).collectionName(options.getCollectionName()).build());
        return new LazyMappingDelegatingMessage<T, R>(message, targetType, this.template.getConverter());
    }

    private boolean isRunning() {
        return Task.State.RUNNING.equals((Object)this.getState());
    }

    private void emitMessage(Message<T, R> message) {
        try {
            this.request.getMessageListener().onMessage(message);
        }
        catch (Exception e) {
            this.errorHandler.handleError(e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    private T getNext() {
        Object object = this.lifecycleMonitor;
        synchronized (object) {
            if (Task.State.RUNNING.equals((Object)this.state)) {
                return (T)this.cursor.tryNext();
            }
        }
        throw new IllegalStateException(String.format("Cursor %s is not longer open", this.cursor));
    }

    private static boolean isValidCursor(@Nullable MongoCursor<?> cursor) {
        if (cursor == null) {
            return false;
        }
        return cursor.getServerCursor() != null && cursor.getServerCursor().getId() != 0L;
    }

    @Nullable
    private <V> V execute(Supplier<V> callback) {
        try {
            return callback.get();
        }
        catch (RuntimeException e) {
            DataAccessException translated = this.template.getExceptionTranslator().translateExceptionIfPossible(e);
            throw translated != null ? translated : e;
        }
    }
}

