/*
 * Decompiled with CFR 0.152.
 */
package org.kitesdk.cli.commands;

import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.kitesdk.cli.commands.BaseDatasetCommand;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.spi.DatasetRepositories;
import org.slf4j.Logger;

@Parameters(commandDescription="Build a Flume config to log events to a dataset")
public class FlumeConfigCommand
extends BaseDatasetCommand {
    @Parameter(description="Dataset name or URI", required=true)
    List<String> datasetName;
    @Parameter(names={"--use-dataset-uri"}, description="Configure Flume with a dataset URI. Requires Flume 1.6+")
    boolean newFlume;
    @Parameter(names={"--agent"}, description="Flume agent name")
    String agent = "tier1";
    @Parameter(names={"--source"}, description="Flume source name")
    String sourceName = "avro-event-source";
    @Parameter(names={"--bind"}, description="Avro source bind address")
    String bindAddress = "0.0.0.0";
    @Parameter(names={"--port"}, description="Avro source port")
    int port = 41415;
    @Parameter(names={"--channel"}, description="Flume channel name")
    String channelName = "avro-event-channel";
    @Parameter(names={"--channel-type"}, description="Flume channel type")
    String channelType = "file";
    @Parameter(names={"--channel-capacity"}, description="Flume channel capacity")
    Integer capacity = null;
    int defaultMemoryChannelCapacity = 10000000;
    @Parameter(names={"--channel-transaction-capacity"}, description="Flume channel transaction capacity")
    Integer transactionCapacity = null;
    int defaultMemoryChannelTransactionCapacity = 1000;
    @Parameter(names={"--checkpoint-dir"}, description="File channel checkpoint directory")
    String checkpointDir = null;
    @Parameter(names={"--data-dir"}, description="File channel data directory, use the option multiple times for multiple data directories")
    List<String> dataDirs = null;
    @Parameter(names={"--sink"}, description="Avro sink name")
    String sinkName = "kite-dataset";
    @Parameter(names={"--batch-size"}, description="Records to write per batch")
    int batchSize = 1000;
    @Parameter(names={"--roll-interval"}, description="Time in seconds before starting the next file")
    int rollInterval = 30;
    @Parameter(names={"--proxy-user"}, description="User to write to HDFS as")
    String proxyUser = null;
    @Parameter(names={"-o", "--output"}, description="Save logging config to path")
    @SuppressWarnings(value={"UWF_NULL_FIELD"}, justification="Field set by JCommander")
    String outputPath = null;

    public FlumeConfigCommand(Logger console) {
        super(console);
    }

    @Override
    @SuppressWarnings(value={"NP_NULL_ON_SOME_PATH"}, justification="Null case checked by precondition")
    public int run() throws IOException {
        Preconditions.checkArgument((this.datasetName != null && !this.datasetName.isEmpty() ? 1 : 0) != 0, (Object)"Missing dataset uri");
        Preconditions.checkArgument((!"file".equals(this.channelType) || this.checkpointDir != null && this.dataDirs != null && !this.dataDirs.isEmpty() ? 1 : 0) != 0, (Object)"--checkpoint-dir and --data-dir are required options when the channel type is 'file'");
        Dataset dataset = this.load(this.datasetName.get(0), GenericRecord.class).getDataset();
        String datasetUri = this.buildDatasetUri(this.datasetName.get(0));
        URI repoUri = this.getLegacyRepoUri((Dataset<GenericRecord>)dataset);
        String name = dataset.getName();
        StringBuilder sb = new StringBuilder();
        sb.append(this.agent).append(".sources = ").append(this.sourceName).append('\n');
        sb.append(this.agent).append(".channels = ").append(this.channelName).append('\n');
        sb.append(this.agent).append(".sinks = ").append(this.sinkName).append('\n');
        sb.append('\n');
        sb.append(this.agent).append(".sources.").append(this.sourceName).append(".type = avro").append('\n');
        sb.append(this.agent).append(".sources.").append(this.sourceName).append(".channels = ").append(this.channelName).append('\n');
        sb.append(this.agent).append(".sources.").append(this.sourceName).append(".bind = ").append(this.bindAddress).append('\n');
        sb.append(this.agent).append(".sources.").append(this.sourceName).append(".port = ").append(this.port).append('\n');
        sb.append('\n');
        sb.append(this.agent).append(".channels.").append(this.channelName).append(".type = ").append(this.channelType).append('\n');
        if ("memory".equals(this.channelType) && this.capacity == null) {
            this.capacity = this.defaultMemoryChannelCapacity;
        }
        if (this.capacity != null) {
            sb.append(this.agent).append(".channels.").append(this.channelName).append(".capacity = ").append(this.capacity).append('\n');
        }
        if ("memory".equals(this.channelType) && this.transactionCapacity == null) {
            this.transactionCapacity = this.defaultMemoryChannelTransactionCapacity;
        }
        if (this.transactionCapacity != null) {
            sb.append(this.agent).append(".channels.").append(this.channelName).append(".transactionCapacity = ").append(this.transactionCapacity).append('\n');
        }
        if ("file".equals(this.channelType)) {
            sb.append(this.agent).append(".channels.").append(this.channelName).append(".checkpointDir = ").append(this.checkpointDir).append('\n');
        }
        if ("file".equals(this.channelType)) {
            sb.append("\n");
            sb.append("# A list of directories where Flume will persist records that are waiting to be\n");
            sb.append("# processed by the sink. You can use multiple directories on different physical\n");
            sb.append("# disks to increase throughput.\n");
            sb.append(this.agent).append(".channels.").append(this.channelName).append(".dataDirs = ");
            boolean first = true;
            for (String dataDir : this.dataDirs) {
                if (!first) {
                    sb.append(", ");
                }
                sb.append(dataDir);
                first = false;
            }
            sb.append('\n');
        }
        sb.append('\n');
        sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".type = org.apache.flume.sink.kite.DatasetSink").append('\n');
        sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".channel = ").append(this.channelName).append('\n');
        if (this.newFlume) {
            sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".kite.dataset.uri = ").append(datasetUri).append('\n');
        } else {
            sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".kite.repo.uri = ").append(repoUri).append('\n');
            sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".kite.dataset.name = ").append(name).append('\n');
        }
        sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".kite.batchSize = ").append(this.batchSize).append('\n');
        sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".kite.rollInterval = ").append(this.rollInterval).append('\n');
        if (this.proxyUser != null) {
            sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".auth.proxyUser = ").append(this.proxyUser).append('\n');
        }
        this.output(sb.toString(), this.console, this.outputPath);
        return 0;
    }

    private URI getLegacyRepoUri(Dataset<GenericRecord> dataset) {
        return this.getLegacyRepoUri(dataset.getUri(), dataset.getNamespace());
    }

    @VisibleForTesting
    URI getLegacyRepoUri(URI datasetUri, String namespace) {
        URI repoUri = DatasetRepositories.repositoryFor((URI)datasetUri).getUri();
        URI specificUri = URI.create(repoUri.getSchemeSpecificPart());
        String repoScheme = specificUri.getScheme();
        if (Sets.newHashSet((Object[])new String[]{"hdfs", "file", "hive"}).contains(repoScheme)) {
            try {
                specificUri = new URI(specificUri.getScheme(), specificUri.getUserInfo(), specificUri.getHost(), specificUri.getPort(), specificUri.getPath() + "/" + namespace, specificUri.getQuery(), specificUri.getFragment());
                repoUri = URI.create("repo:" + specificUri.toString());
            }
            catch (URISyntaxException ex) {
                throw new DatasetException("Error generating legacy URI", (Throwable)ex);
            }
        }
        return repoUri;
    }

    @Override
    public List<String> getExamples() {
        return Lists.newArrayList((Object[])new String[]{"# Print Flume configuration to log to dataset \"users\":", "--checkpoint-dir /data/0/flume/checkpoint --data-dir /data/1/flume/data users", "# Print Flume configuration to log to dataset \"dataset:hdfs:/datasets/default/users\":", "--channel-type memory dataset:hdfs:/datasets/default/users", "# Save Flume configuration to the file \"flume.properties\":", "--channel-type memory -o flume.properties users"});
    }
}

