/*
 * Decompiled with CFR 0.152.
 */
package org.mbari.vcr4j.sharktopoda.client.localization;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.time.Duration;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.mbari.vcr4j.sharktopoda.client.gson.DurationConverter;
import org.mbari.vcr4j.sharktopoda.client.localization.LocalizationController;
import org.mbari.vcr4j.sharktopoda.client.localization.Message;
import org.mbari.vcr4j.sharktopoda.client.localization.SelectionController;
import org.mbari.vcr4j.util.StringUtils;
import org.zeromq.SocketType;
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQException;

public class IO {
    private static final System.Logger log = System.getLogger(IO.class.getName());
    private ZContext context = new ZContext();
    private final int incomingPort;
    private final int outgoingPort;
    private final LocalizationController controller;
    private final SelectionController selectionController;
    private final Thread outgoingThread;
    private final Thread incomingThread;
    private volatile boolean ok = true;
    private LinkedBlockingQueue<Message> queue = new LinkedBlockingQueue();
    private final Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'").registerTypeAdapter(Duration.class, (Object)new DurationConverter()).create();
    private final String sourceId = StringUtils.randomString((int)10);

    public IO(int incomingPort, int outgoingPort, String incomingTopic, String outgoingTopic, LocalizationController controller) {
        this.incomingPort = incomingPort;
        this.outgoingPort = outgoingPort;
        this.controller = controller;
        this.controller.getOutgoing().ofType(Message.class).subscribe(lcl -> this.queue.offer((Message)lcl));
        this.selectionController = new SelectionController(controller);
        this.outgoingThread = new Thread(() -> {
            String address = "tcp://*:" + outgoingPort;
            log.log(System.Logger.Level.INFO, () -> "ZeroMQ Publishing to " + address + " using topic '" + outgoingTopic + "'");
            ZMQ.Socket publisher = this.context.createSocket(SocketType.PUB);
            publisher.bind(address);
            try {
                Thread.sleep(1000L);
            }
            catch (InterruptedException e) {
                log.log(System.Logger.Level.WARNING, "ZeroMQ publisher thread was interrupted", (Throwable)e);
            }
            while (this.ok && !Thread.currentThread().isInterrupted()) {
                try {
                    Message msg = this.queue.poll(1L, TimeUnit.SECONDS);
                    if (msg == null) continue;
                    String json = this.gson.toJson((Object)msg);
                    log.log(System.Logger.Level.DEBUG, "Publishing to '" + outgoingTopic + "': \n" + json);
                    publisher.sendMore(outgoingTopic);
                    publisher.send(json);
                }
                catch (InterruptedException e) {
                    log.log(System.Logger.Level.WARNING, "ZeroMQ Publisher thread was interrupted", (Throwable)e);
                    this.ok = false;
                }
                catch (Exception e) {
                    log.log(System.Logger.Level.WARNING, "An exception was thrown will attempting to publish a localization", (Throwable)e);
                }
            }
            log.log(System.Logger.Level.INFO, "Shutting down ZeroMQ publisher at " + address);
            publisher.close();
        });
        this.outgoingThread.setDaemon(true);
        this.outgoingThread.start();
        this.incomingThread = new Thread(() -> {
            String address = "tcp://localhost:" + incomingPort;
            log.log(System.Logger.Level.INFO, "ZeroMQ Subscribing to " + address + " using topic '" + outgoingTopic + "'");
            ZMQ.Socket socket = this.context.createSocket(SocketType.SUB);
            socket.connect(address);
            socket.subscribe(incomingTopic.getBytes(ZMQ.CHARSET));
            while (this.ok && !Thread.currentThread().isInterrupted()) {
                try {
                    String topicAddress = socket.recvStr();
                    String contents = socket.recvStr();
                    log.log(System.Logger.Level.DEBUG, () -> "Received on '" + topicAddress + "':" + contents);
                    Message message = (Message)this.gson.fromJson(contents, Message.class);
                    controller.getIncoming().onNext((Object)message);
                }
                catch (ZMQException e) {
                    if (e.getErrorCode() == 156384765) continue;
                    log.log(System.Logger.Level.WARNING, "An exception occurred while reading from remote app", (Throwable)e);
                }
                catch (Exception e) {
                    log.log(System.Logger.Level.WARNING, "An exception occurred while reading from remote app", (Throwable)e);
                }
            }
        });
        this.incomingThread.setDaemon(true);
        this.incomingThread.start();
    }

    public IO(int incomingPort, int outgoingPort, String incomingTopic, String outgoingTopic) {
        this(incomingPort, outgoingPort, incomingTopic, outgoingTopic, new LocalizationController());
    }

    public int getIncomingPort() {
        return this.incomingPort;
    }

    public int getOutgoingPort() {
        return this.outgoingPort;
    }

    public LocalizationController getController() {
        return this.controller;
    }

    public SelectionController getSelectionController() {
        return this.selectionController;
    }

    public void publish(Message msg) {
        this.controller.getOutgoing().onNext((Object)msg);
    }

    public void close() {
        this.ok = false;
        this.context.close();
        this.controller.getIncoming().onComplete();
        this.controller.getOutgoing().onComplete();
    }

    public Gson getGson() {
        return this.gson;
    }

    public String getSourceId() {
        return this.sourceId;
    }
}

