/*
 * Decompiled with CFR 0.152.
 */
package kz.greetgo.spring.websocket.mongo;

import com.mongodb.MongoException;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import kz.greetgo.spring.websocket.interfaces.NeedClose;
import kz.greetgo.spring.websocket.mongo.MongoErrorFilters;
import kz.greetgo.spring.websocket.util.ConsoleColors;
import kz.greetgo.spring.websocket.util.LoggingUtil;
import org.bson.BsonDocument;
import org.slf4j.Logger;

public abstract class AbstractMongoWatcher<DataObject>
implements NeedClose {
    private final Logger callingLog = LoggingUtil.callingLog;
    public final String sessionId;
    private final Thread watchingThread;
    private final AtomicBoolean opened = new AtomicBoolean(true);
    private final AtomicReference<BsonDocument> resumeToken = new AtomicReference<Object>(null);

    public AbstractMongoWatcher(String sessionId) {
        this.sessionId = sessionId;
        this.watchingThread = new Thread(this::watch);
    }

    protected void start() {
        this.watchingThread.start();
        if (this.callingLog.isInfoEnabled()) {
            this.callingLog.info(ConsoleColors.CYAN_BOLD() + "WATCH     " + ConsoleColors.RESET() + this.sessionId + ConsoleColors.CYAN_BOLD() + " " + this.inWatchPlace() + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
        }
    }

    @Override
    public void close() {
        this.opened.set(false);
        this.watchingThread.interrupt();
        if (this.callingLog.isInfoEnabled()) {
            this.callingLog.info(ConsoleColors.CYAN_BOLD() + "UNWATCH   " + ConsoleColors.RESET() + this.sessionId + ConsoleColors.CYAN_BOLD() + " " + this.inWatchPlace() + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
        }
    }

    protected abstract String inWatchPlace();

    protected abstract MongoChangeStreamCursor<ChangeStreamDocument<DataObject>> createCursor(BsonDocument var1);

    protected abstract void acceptDoc(ChangeStreamDocument<DataObject> var1);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void watch() {
        if (this.callingLog.isInfoEnabled()) {
            this.callingLog.info("IN WATCH  " + this.sessionId + ConsoleColors.CYAN_BOLD() + " " + this.inWatchPlace() + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
        }
        while (this.opened.get()) {
            try {
                AtomicReference<Object> cursorRef = new AtomicReference<Object>(null);
                try {
                    MongoChangeStreamCursor<ChangeStreamDocument<DataObject>> cursor = this.createCursor(this.resumeToken.get());
                    try {
                        cursorRef.set(cursor);
                        if (this.callingLog.isInfoEnabled()) {
                            this.callingLog.info("NEW CUR   " + this.sessionId + ConsoleColors.GREEN_BOLD() + " cursor.addr=" + System.identityHashCode(cursor) + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
                        }
                        cursor.forEachRemaining(doc -> {
                            this.resumeToken.set(doc.getResumeToken());
                            this.acceptDoc((ChangeStreamDocument<DataObject>)doc);
                        });
                    }
                    finally {
                        if (cursor == null) continue;
                        cursor.close();
                    }
                }
                catch (MongoException e) {
                    MongoErrorFilters.mongo(e).ignoreStateShouldBeOpened(true).ignoreCursorHasBeenClosed(true).ignoreMongoInterrupted(true).check();
                }
                finally {
                    MongoChangeStreamCursor cc = cursorRef.get();
                    if (cc == null || !this.callingLog.isInfoEnabled()) continue;
                    this.callingLog.info("CLO CUR   " + this.sessionId + ConsoleColors.RED_BOLD() + " cursor.addr=" + System.identityHashCode(cc) + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
                }
            }
            catch (Exception e) {
                this.callingLog.error(ConsoleColors.RED_BOLD() + "ERROR    " + ConsoleColors.RESET() + " " + this.sessionId + " " + ConsoleColors.RED() + " \u041e\u0448\u0438\u0431\u043a\u0430 \u0432 \u0441\u043c\u043e\u0442\u0440\u0438\u0442\u0435\u043b\u0435: " + e.getClass().getName() + " : " + e.getMessage() + ConsoleColors.RESET(), (Throwable)e);
            }
        }
        if (this.callingLog.isInfoEnabled()) {
            this.callingLog.info("NO WATCH  " + this.sessionId + ConsoleColors.CYAN_BOLD() + " " + this.inWatchPlace() + ConsoleColors.RESET() + " Thread=" + Thread.currentThread().getName());
        }
    }

    public void join() {
        try {
            this.watchingThread.join();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }
}

