/*
 * Decompiled with CFR 0.152.
 */
package org.graylog.shaded.kafka09.kafka.tools;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.graylog.shaded.kafka09.joptsimple.ArgumentAcceptingOptionSpec;
import org.graylog.shaded.kafka09.joptsimple.OptionParser;
import org.graylog.shaded.kafka09.joptsimple.OptionSet;
import org.graylog.shaded.kafka09.joptsimple.OptionSpec;
import org.graylog.shaded.kafka09.joptsimple.OptionSpecBuilder;
import org.graylog.shaded.kafka09.kafka.javaapi.producer.Producer;
import org.graylog.shaded.kafka09.kafka.producer.KeyedMessage;
import org.graylog.shaded.kafka09.kafka.producer.ProducerConfig;
import org.graylog.shaded.kafka09.org.apache.kafka.common.utils.Utils;
import org.graylog.shaded.kafka09.org.apache.log4j.Logger;

public class KafkaMigrationTool {
    private static final Logger log = Logger.getLogger(KafkaMigrationTool.class.getName());
    private static final String KAFKA_07_STATIC_CONSUMER_CLASS_NAME = "org.graylog.shaded.kafka09.kafka.consumer.Consumer";
    private static final String KAFKA_07_CONSUMER_CONFIG_CLASS_NAME = "org.graylog.shaded.kafka09.kafka.consumer.ConsumerConfig";
    private static final String KAFKA_07_CONSUMER_STREAM_CLASS_NAME = "org.graylog.shaded.kafka09.kafka.consumer.KafkaStream";
    private static final String KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME = "org.graylog.shaded.kafka09.kafka.consumer.ConsumerIterator";
    private static final String KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME = "org.graylog.shaded.kafka09.kafka.javaapi.consumer.ConsumerConnector";
    private static final String KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME = "org.graylog.shaded.kafka09.kafka.message.MessageAndMetadata";
    private static final String KAFKA_07_MESSAGE_CLASS_NAME = "org.graylog.shaded.kafka09.kafka.message.Message";
    private static final String KAFKA_07_WHITE_LIST_CLASS_NAME = "org.graylog.shaded.kafka09.kafka.consumer.Whitelist";
    private static final String KAFKA_07_TOPIC_FILTER_CLASS_NAME = "org.graylog.shaded.kafka09.kafka.consumer.TopicFilter";
    private static final String KAFKA_07_BLACK_LIST_CLASS_NAME = "org.graylog.shaded.kafka09.kafka.consumer.Blacklist";
    private static Class<?> KafkaStaticConsumer_07 = null;
    private static Class<?> ConsumerConfig_07 = null;
    private static Class<?> ConsumerConnector_07 = null;
    private static Class<?> KafkaStream_07 = null;
    private static Class<?> TopicFilter_07 = null;
    private static Class<?> WhiteList_07 = null;
    private static Class<?> BlackList_07 = null;
    private static Class<?> KafkaConsumerIteratorClass_07 = null;
    private static Class<?> KafkaMessageAndMetatDataClass_07 = null;
    private static Class<?> KafkaMessageClass_07 = null;

    public static void main(String[] args) throws InterruptedException, IOException {
        int blackListCount;
        OptionParser parser = new OptionParser();
        ArgumentAcceptingOptionSpec<String> consumerConfigOpt = parser.accepts("consumer.config", "Kafka 0.7 consumer config to consume from the source 0.7 cluster. You man specify multiple of these.").withRequiredArg().describedAs("config file").ofType(String.class);
        ArgumentAcceptingOptionSpec<String> producerConfigOpt = parser.accepts("producer.config", "Producer config.").withRequiredArg().describedAs("config file").ofType(String.class);
        ArgumentAcceptingOptionSpec<Integer> numProducersOpt = parser.accepts("num.producers", "Number of producer instances").withRequiredArg().describedAs("Number of producers").ofType(Integer.class).defaultsTo(1, (Integer[])new Integer[0]);
        ArgumentAcceptingOptionSpec<String> zkClient01JarOpt = parser.accepts("zkclient.01.jar", "zkClient 0.1 jar file").withRequiredArg().describedAs("zkClient 0.1 jar file required by Kafka 0.7").ofType(String.class);
        ArgumentAcceptingOptionSpec<String> kafka07JarOpt = parser.accepts("org.graylog.shaded.kafka09.kafka.07.jar", "Kafka 0.7 jar file").withRequiredArg().describedAs("org.graylog.shaded.kafka09.kafka 0.7 jar").ofType(String.class);
        ArgumentAcceptingOptionSpec<Integer> numStreamsOpt = parser.accepts("num.streams", "Number of consumer streams").withRequiredArg().describedAs("Number of consumer threads").ofType(Integer.class).defaultsTo(1, (Integer[])new Integer[0]);
        ArgumentAcceptingOptionSpec<String> whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to migrate from the 0.7 cluster").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
        ArgumentAcceptingOptionSpec<String> blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to migrate from the 0.7 cluster").withRequiredArg().describedAs("Java regex (String)").ofType(String.class);
        ArgumentAcceptingOptionSpec<Integer> queueSizeOpt = parser.accepts("queue.size", "Number of messages that are buffered between the 0.7 consumer and 0.8 producer").withRequiredArg().describedAs("Queue size in terms of number of messages").ofType(Integer.class).defaultsTo(10000, (Integer[])new Integer[0]);
        OptionSpecBuilder helpOpt = parser.accepts("help", "Print this message.");
        OptionSet options = parser.parse(args);
        if (options.has(helpOpt)) {
            parser.printHelpOn(System.out);
            System.exit(0);
        }
        KafkaMigrationTool.checkRequiredArgs(parser, options, new OptionSpec[]{consumerConfigOpt, producerConfigOpt, zkClient01JarOpt, kafka07JarOpt});
        int whiteListCount = options.has(whitelistOpt) ? 1 : 0;
        int n = blackListCount = options.has(blacklistOpt) ? 1 : 0;
        if (whiteListCount + blackListCount != 1) {
            System.err.println("Exactly one of whitelist or blacklist is required.");
            System.exit(1);
        }
        String kafkaJarFile_07 = options.valueOf(kafka07JarOpt);
        String zkClientJarFile = options.valueOf(zkClient01JarOpt);
        String consumerConfigFile_07 = options.valueOf(consumerConfigOpt);
        int numConsumers = options.valueOf(numStreamsOpt);
        String producerConfigFile_08 = options.valueOf(producerConfigOpt);
        int numProducers = options.valueOf(numProducersOpt);
        final ArrayList<MigrationThread> migrationThreads = new ArrayList<MigrationThread>(numConsumers);
        final ArrayList<ProducerThread> producerThreads = new ArrayList<ProducerThread>(numProducers);
        try {
            File kafkaJar_07 = new File(kafkaJarFile_07);
            File zkClientJar = new File(zkClientJarFile);
            ParentLastURLClassLoader c1 = new ParentLastURLClassLoader(new URL[]{kafkaJar_07.toURI().toURL(), zkClientJar.toURI().toURL()});
            ConsumerConfig_07 = c1.loadClass(KAFKA_07_CONSUMER_CONFIG_CLASS_NAME);
            KafkaStaticConsumer_07 = c1.loadClass(KAFKA_07_STATIC_CONSUMER_CLASS_NAME);
            ConsumerConnector_07 = c1.loadClass(KAFKA_07_CONSUMER_CONNECTOR_CLASS_NAME);
            KafkaStream_07 = c1.loadClass(KAFKA_07_CONSUMER_STREAM_CLASS_NAME);
            TopicFilter_07 = c1.loadClass(KAFKA_07_TOPIC_FILTER_CLASS_NAME);
            WhiteList_07 = c1.loadClass(KAFKA_07_WHITE_LIST_CLASS_NAME);
            BlackList_07 = c1.loadClass(KAFKA_07_BLACK_LIST_CLASS_NAME);
            KafkaMessageClass_07 = c1.loadClass(KAFKA_07_MESSAGE_CLASS_NAME);
            KafkaConsumerIteratorClass_07 = c1.loadClass(KAFKA_07_CONSUMER_ITERATOR_CLASS_NAME);
            KafkaMessageAndMetatDataClass_07 = c1.loadClass(KAFKA_07_MESSAGE_AND_METADATA_CLASS_NAME);
            Constructor<?> ConsumerConfigConstructor_07 = ConsumerConfig_07.getConstructor(Properties.class);
            Properties kafkaConsumerProperties_07 = new Properties();
            kafkaConsumerProperties_07.load(new FileInputStream(consumerConfigFile_07));
            if (kafkaConsumerProperties_07.getProperty("shallow.iterator.enable", "").equals("true")) {
                log.warn("Shallow iterator should not be used in the migration tool");
                kafkaConsumerProperties_07.setProperty("shallow.iterator.enable", "false");
            }
            Object consumerConfig_07 = ConsumerConfigConstructor_07.newInstance(kafkaConsumerProperties_07);
            Method ConsumerConnectorCreationMethod_07 = KafkaStaticConsumer_07.getMethod("createJavaConsumerConnector", ConsumerConfig_07);
            final Object consumerConnector_07 = ConsumerConnectorCreationMethod_07.invoke(null, consumerConfig_07);
            Method ConsumerConnectorCreateMessageStreamsMethod_07 = ConsumerConnector_07.getMethod("createMessageStreamsByFilter", TopicFilter_07, Integer.TYPE);
            final Method ConsumerConnectorShutdownMethod_07 = ConsumerConnector_07.getMethod("shutdown", new Class[0]);
            Constructor<?> WhiteListConstructor_07 = WhiteList_07.getConstructor(String.class);
            Constructor<?> BlackListConstructor_07 = BlackList_07.getConstructor(String.class);
            Object filterSpec = null;
            filterSpec = options.has(whitelistOpt) ? WhiteListConstructor_07.newInstance(options.valueOf(whitelistOpt)) : BlackListConstructor_07.newInstance(options.valueOf(blacklistOpt));
            Object retKafkaStreams = ConsumerConnectorCreateMessageStreamsMethod_07.invoke(consumerConnector_07, filterSpec, numConsumers);
            Properties kafkaProducerProperties_08 = new Properties();
            kafkaProducerProperties_08.load(new FileInputStream(producerConfigFile_08));
            kafkaProducerProperties_08.setProperty("serializer.class", "org.graylog.shaded.kafka09.kafka.serializer.DefaultEncoder");
            int queueSize = options.valueOf(queueSizeOpt);
            ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel = new ProducerDataChannel<KeyedMessage<byte[], byte[]>>(queueSize);
            int threadId = 0;
            Runtime.getRuntime().addShutdownHook(new Thread(){

                @Override
                public void run() {
                    try {
                        ConsumerConnectorShutdownMethod_07.invoke(consumerConnector_07, new Object[0]);
                    }
                    catch (Exception e) {
                        log.error("Error while shutting down Kafka consumer", e);
                    }
                    for (MigrationThread migrationThread : migrationThreads) {
                        migrationThread.shutdown();
                    }
                    for (ProducerThread producerThread : producerThreads) {
                        producerThread.shutdown();
                    }
                    for (ProducerThread producerThread : producerThreads) {
                        producerThread.awaitShutdown();
                    }
                    log.info("Kafka migration tool shutdown successfully");
                }
            });
            for (Object stream : (List)retKafkaStreams) {
                MigrationThread thread = new MigrationThread(stream, producerDataChannel, threadId);
                ++threadId;
                thread.start();
                migrationThreads.add(thread);
            }
            String clientId = kafkaProducerProperties_08.getProperty("client.id");
            for (int i = 0; i < numProducers; ++i) {
                kafkaProducerProperties_08.put("client.id", clientId + "-" + i);
                ProducerConfig producerConfig_08 = new ProducerConfig(kafkaProducerProperties_08);
                Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(producerConfig_08);
                ProducerThread producerThread = new ProducerThread(producerDataChannel, producer, i);
                producerThread.start();
                producerThreads.add(producerThread);
            }
        }
        catch (Throwable e) {
            System.out.println("Kafka migration tool failed due to: " + Utils.stackTrace(e));
            log.error("Kafka migration tool failed: ", e);
        }
    }

    private static void checkRequiredArgs(OptionParser parser, OptionSet options, OptionSpec[] required) throws IOException {
        for (OptionSpec arg : required) {
            if (options.has(arg)) continue;
            System.err.println("Missing required argument \"" + arg + "\"");
            parser.printHelpOn(System.err);
            System.exit(1);
        }
    }

    private static class ParentLastURLClassLoader
    extends ClassLoader {
        private ChildURLClassLoader childClassLoader;

        public ParentLastURLClassLoader(URL[] urls) {
            super(Thread.currentThread().getContextClassLoader());
            this.childClassLoader = new ChildURLClassLoader(urls, new FindClassClassLoader(this.getParent()));
        }

        @Override
        protected synchronized Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
            try {
                return this.childClassLoader.findClass(name);
            }
            catch (ClassNotFoundException e) {
                return super.loadClass(name, resolve);
            }
        }

        private static class ChildURLClassLoader
        extends URLClassLoader {
            private FindClassClassLoader realParent;

            public ChildURLClassLoader(URL[] urls, FindClassClassLoader realParent) {
                super(urls, (ClassLoader)null);
                this.realParent = realParent;
            }

            @Override
            public Class<?> findClass(String name) throws ClassNotFoundException {
                try {
                    return super.findClass(name);
                }
                catch (ClassNotFoundException e) {
                    return this.realParent.loadClass(name);
                }
            }
        }

        private static class FindClassClassLoader
        extends ClassLoader {
            public FindClassClassLoader(ClassLoader parent) {
                super(parent);
            }

            @Override
            public Class<?> findClass(String name) throws ClassNotFoundException {
                return super.findClass(name);
            }
        }
    }

    static class ProducerThread
    extends Thread {
        private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
        private final Producer<byte[], byte[]> producer;
        private final int threadId;
        private String threadName;
        private Logger logger;
        private CountDownLatch shutdownComplete = new CountDownLatch(1);
        private KeyedMessage<byte[], byte[]> shutdownMessage = new KeyedMessage<Object, Object>("shutdown", null, null);

        public ProducerThread(ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel, Producer<byte[], byte[]> _producer, int _threadId) {
            this.producerDataChannel = _producerDataChannel;
            this.producer = _producer;
            this.threadId = _threadId;
            this.threadName = "ProducerThread-" + this.threadId;
            this.logger = Logger.getLogger(ProducerThread.class.getName());
            this.setName(this.threadName);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                KeyedMessage<byte[], byte[]> data;
                while (!(data = this.producerDataChannel.receiveRequest()).equals(this.shutdownMessage)) {
                    this.producer.send(data);
                    if (!this.logger.isDebugEnabled()) continue;
                    this.logger.debug(String.format("Sending message %s", new String(data.message())));
                }
                this.logger.info("Producer thread " + this.threadName + " finished running");
            }
            catch (Throwable t) {
                this.logger.fatal("Producer thread failure due to ", t);
            }
            finally {
                this.shutdownComplete.countDown();
            }
        }

        public void shutdown() {
            try {
                this.logger.info("Producer thread " + this.threadName + " shutting down");
                this.producerDataChannel.sendRequest(this.shutdownMessage);
            }
            catch (InterruptedException ie) {
                this.logger.warn("Interrupt during shutdown of ProducerThread", ie);
            }
        }

        public void awaitShutdown() {
            try {
                this.shutdownComplete.await();
                this.producer.close();
                this.logger.info("Producer thread " + this.threadName + " shutdown complete");
            }
            catch (InterruptedException ie) {
                this.logger.warn("Interrupt during shutdown of ProducerThread", ie);
            }
        }
    }

    private static class MigrationThread
    extends Thread {
        private final Object stream;
        private final ProducerDataChannel<KeyedMessage<byte[], byte[]>> producerDataChannel;
        private final int threadId;
        private final String threadName;
        private final Logger logger;
        private CountDownLatch shutdownComplete = new CountDownLatch(1);
        private final AtomicBoolean isRunning = new AtomicBoolean(true);

        MigrationThread(Object _stream, ProducerDataChannel<KeyedMessage<byte[], byte[]>> _producerDataChannel, int _threadId) {
            this.stream = _stream;
            this.producerDataChannel = _producerDataChannel;
            this.threadId = _threadId;
            this.threadName = "MigrationThread-" + this.threadId;
            this.logger = Logger.getLogger(MigrationThread.class.getName());
            this.setName(this.threadName);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                Method MessageGetPayloadMethod_07 = KafkaMessageClass_07.getMethod("payload", new Class[0]);
                Method KafkaGetMessageMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("message", new Class[0]);
                Method KafkaGetTopicMethod_07 = KafkaMessageAndMetatDataClass_07.getMethod("topic", new Class[0]);
                Method ConsumerIteratorMethod = KafkaStream_07.getMethod("iterator", new Class[0]);
                Method KafkaStreamHasNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("hasNext", new Class[0]);
                Method KafkaStreamNextMethod_07 = KafkaConsumerIteratorClass_07.getMethod("next", new Class[0]);
                Object iterator2 = ConsumerIteratorMethod.invoke(this.stream, new Object[0]);
                while (((Boolean)KafkaStreamHasNextMethod_07.invoke(iterator2, new Object[0])).booleanValue()) {
                    Object messageAndMetaData_07 = KafkaStreamNextMethod_07.invoke(iterator2, new Object[0]);
                    Object message_07 = KafkaGetMessageMethod_07.invoke(messageAndMetaData_07, new Object[0]);
                    Object topic = KafkaGetTopicMethod_07.invoke(messageAndMetaData_07, new Object[0]);
                    Object payload_07 = MessageGetPayloadMethod_07.invoke(message_07, new Object[0]);
                    int size2 = ((ByteBuffer)payload_07).remaining();
                    byte[] bytes = new byte[size2];
                    ((ByteBuffer)payload_07).get(bytes);
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Migration thread " + this.threadId + " sending message of size " + bytes.length + " to topic " + topic);
                    }
                    KeyedMessage<Object, byte[]> producerData = new KeyedMessage<Object, byte[]>((String)topic, null, bytes);
                    this.producerDataChannel.sendRequest(producerData);
                }
                this.logger.info("Migration thread " + this.threadName + " finished running");
            }
            catch (InvocationTargetException t) {
                this.logger.fatal("Migration thread failure due to root cause ", t.getCause());
            }
            catch (Throwable t) {
                this.logger.fatal("Migration thread failure due to ", t);
            }
            finally {
                this.shutdownComplete.countDown();
            }
        }

        public void shutdown() {
            this.logger.info("Migration thread " + this.threadName + " shutting down");
            this.isRunning.set(false);
            this.interrupt();
            try {
                this.shutdownComplete.await();
            }
            catch (InterruptedException ie) {
                this.logger.warn("Interrupt during shutdown of MigrationThread", ie);
            }
            this.logger.info("Migration thread " + this.threadName + " shutdown complete");
        }
    }

    static class ProducerDataChannel<T> {
        private final int producerQueueSize;
        private final BlockingQueue<T> producerRequestQueue;

        public ProducerDataChannel(int queueSize) {
            this.producerQueueSize = queueSize;
            this.producerRequestQueue = new ArrayBlockingQueue<T>(this.producerQueueSize);
        }

        public void sendRequest(T data) throws InterruptedException {
            this.producerRequestQueue.put(data);
        }

        public T receiveRequest() throws InterruptedException {
            return this.producerRequestQueue.take();
        }
    }
}

