package org.elasticsearch.hadoop.handler.impl.elasticsearch;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.hadoop.cfg.CompositeSettings;
import org.elasticsearch.hadoop.cfg.PropertiesSettings;
import org.elasticsearch.hadoop.cfg.Settings;
import org.elasticsearch.hadoop.handler.ErrorCollector;
import org.elasticsearch.hadoop.handler.ErrorHandler;
import org.elasticsearch.hadoop.handler.Exceptional;
import org.elasticsearch.hadoop.handler.HandlerResult;
import org.elasticsearch.hadoop.rest.InitializationUtils;
import org.elasticsearch.hadoop.rest.Resource;
import org.elasticsearch.hadoop.rest.RestClient;
import org.elasticsearch.hadoop.rest.RestRepository;
import org.elasticsearch.hadoop.rest.RestService;
import org.elasticsearch.hadoop.serialization.field.IndexExtractor;
import org.elasticsearch.hadoop.util.Assert;
import org.elasticsearch.hadoop.util.BytesArray;
import org.elasticsearch.hadoop.util.ObjectUtils;
import org.elasticsearch.hadoop.util.SettingsUtils;
import org.elasticsearch.hadoop.util.StringUtils;
import org.elasticsearch.hadoop.util.ecs.ElasticCommonSchema;
import org.elasticsearch.hadoop.util.ecs.MessageTemplate;
import org.elasticsearch.hadoop.util.unit.Booleans;

/* loaded from: input_file:org/elasticsearch/hadoop/handler/impl/elasticsearch/ElasticsearchHandler.class */
public class ElasticsearchHandler<I extends Exceptional, O, C extends ErrorCollector<O>> implements ErrorHandler<I, O, C> {
    private static final String CONST_EVENT_CATEGORY = "error";
    public static final String CONF_RETURN_VALUE = "return.default";
    public static final String CONF_RETURN_ERROR = "return.error";
    public static final String CONF_PASS_REASON_SUFFIX = "reason";
    public static final String CONF_LABEL = "label";
    public static final String CONF_TAGS = "tags";
    public static final String CONF_CLIENT_NODES = "client.nodes";
    public static final String CONF_CLIENT_PORT = "client.port";
    public static final String CONF_CLIENT_RESOURCE = "client.resource";
    public static final String CONF_CLIENT_INHERIT = "client.inherit";
    public static final String CONF_CLIENT_CONF = "client.conf";
    private HandlerResult returnDefault;
    private String successReason;
    private HandlerResult returnError;
    private String errorReason;
    private Settings rootSettings;
    private Settings clientSettings;
    private EventConverter<I> eventConverter;
    private MessageTemplate messageTemplate;
    private boolean initialized;
    private Resource endpoint;
    private RestRepository writeClient;
    private static final Log LOG = LogFactory.getLog(ElasticsearchHandler.class);
    public static final String CONF_RETURN_VALUE_DEFAULT = HandlerResult.HANDLED.toString();
    public static final String CONF_RETURN_ERROR_DEFAULT = HandlerResult.ABORT.toString();

    public static <I extends Exceptional, O, C extends ErrorCollector<O>> ElasticsearchHandler<I, O, C> create(Settings settings, EventConverter<I> eventConverter) {
        return new ElasticsearchHandler<>(settings, eventConverter);
    }

    public ElasticsearchHandler(Settings settings, EventConverter<I> eventConverter) {
        this.rootSettings = settings;
        this.eventConverter = eventConverter;
    }

    @Override // org.elasticsearch.hadoop.handler.ErrorHandler
    public void init(Properties properties) {
        PropertiesSettings propertiesSettings = new PropertiesSettings(properties);
        boolean parseBoolean = propertiesSettings.getProperty(CONF_CLIENT_INHERIT) != null ? Booleans.parseBoolean(propertiesSettings.getProperty(CONF_CLIENT_INHERIT)) : true;
        if (SettingsUtils.hasJobTransportPoolingKey(this.rootSettings)) {
            propertiesSettings.setProperty("client.conf.es.internal.transport.pooling.key", SettingsUtils.getJobTransportPoolingKey(this.rootSettings) + "_" + UUID.randomUUID().toString());
        }
        resolveProperty(CONF_CLIENT_NODES, "client.conf.es.nodes", propertiesSettings);
        resolveProperty(CONF_CLIENT_PORT, "client.conf.es.port", propertiesSettings);
        resolveProperty(CONF_CLIENT_RESOURCE, "client.conf.es.resource.write", propertiesSettings);
        resolveProperty(CONF_CLIENT_RESOURCE, "client.conf.es.resource", propertiesSettings);
        this.clientSettings = propertiesSettings.getSettingsView(CONF_CLIENT_CONF);
        Assert.hasText(this.clientSettings.getResourceWrite(), "Could not locate write resource for ES error handler.");
        if (parseBoolean) {
            LOG.info("Elasticsearch Error Handler inheriting root configuration");
            this.clientSettings = new CompositeSettings(Arrays.asList(this.clientSettings, this.rootSettings.excludeFilter("es.internal")));
        } else {
            LOG.info("Elasticsearch Error Handler proceeding without inheriting root configuration options as configured");
        }
        InitializationUtils.discoverAndValidateClusterInfo(this.clientSettings, LOG);
        Resource resource = new Resource(this.clientSettings, false);
        IndexExtractor indexExtractor = (IndexExtractor) ObjectUtils.instantiate(this.clientSettings.getMappingIndexExtractorClassName(), propertiesSettings);
        indexExtractor.compile(resource.toString());
        if (indexExtractor.hasPattern()) {
            throw new IllegalArgumentException(String.format("Cannot use index format within Elasticsearch Error Handler. Format was [%s]", resource.toString()));
        }
        this.endpoint = resource;
        ElasticCommonSchema.TemplateBuilder eventCategory = new ElasticCommonSchema().buildTemplate().setEventCategory("error");
        for (Map.Entry entry : propertiesSettings.getSettingsView(CONF_LABEL).asProperties().entrySet()) {
            eventCategory.addLabel(entry.getKey().toString(), entry.getValue().toString());
        }
        eventCategory.addTags(StringUtils.tokenize(propertiesSettings.getProperty("tags")));
        this.messageTemplate = this.eventConverter.configureTemplate(eventCategory).build();
        this.returnDefault = HandlerResult.valueOf(propertiesSettings.getProperty(CONF_RETURN_VALUE, CONF_RETURN_VALUE_DEFAULT));
        if (HandlerResult.PASS == this.returnDefault) {
            this.successReason = propertiesSettings.getProperty("return.default.reason");
        } else {
            this.successReason = null;
        }
        this.returnError = HandlerResult.valueOf(propertiesSettings.getProperty(CONF_RETURN_ERROR, CONF_RETURN_ERROR_DEFAULT));
        if (HandlerResult.PASS == this.returnError) {
            this.errorReason = propertiesSettings.getProperty("return.error.reason");
        } else {
            this.errorReason = null;
        }
    }

    private void resolveProperty(String str, String str2, Settings settings) {
        String property = settings.getProperty(str);
        String property2 = settings.getProperty(str2);
        if (StringUtils.hasText(property) && StringUtils.hasText(property2)) {
            LOG.warn(String.format("Found both [%s] and [%s] settings during elasticsearch handler init. Continuing with value from [%s] (%s)", str, str2, str, property));
        }
        if (StringUtils.hasText(property)) {
            settings.setProperty(str2, property);
        }
    }

    private void lazyInitWrite() {
        if (this.initialized) {
            return;
        }
        this.initialized = true;
        this.writeClient = RestService.createWriter(this.clientSettings, -1L, 0, LOG).repository;
    }

    @Override // org.elasticsearch.hadoop.handler.ErrorHandler
    public HandlerResult onError(I i, C c) throws Exception {
        HandlerResult generateResult;
        try {
            lazyInitWrite();
            if (isOpen()) {
                putDocument(this.writeClient.getRestClient(), createErrorDocument(i));
                generateResult = generateResult(this.returnDefault, this.successReason, c);
            } else {
                generateResult = generateResult(this.returnError, this.errorReason, c);
            }
        } catch (Exception e) {
            LOG.error("Could not send error handling data to ES", e);
            generateResult = generateResult(this.returnError, this.errorReason, c);
        }
        return generateResult;
    }

    private boolean isOpen() {
        return this.writeClient != null;
    }

    private BytesArray createErrorDocument(I i) throws IOException {
        return this.eventConverter.generateEvent(i, this.messageTemplate);
    }

    private void putDocument(RestClient restClient, BytesArray bytesArray) throws IOException {
        restClient.postDocument(this.endpoint, bytesArray);
    }

    private HandlerResult generateResult(HandlerResult handlerResult, String str, C c) {
        return HandlerResult.PASS == handlerResult ? c.pass(str) : handlerResult;
    }

    @Override // org.elasticsearch.hadoop.handler.ErrorHandler
    public void close() {
        if (isOpen()) {
            if (this.clientSettings.getBatchRefreshAfterWrite()) {
                this.writeClient.getRestClient().refresh(this.endpoint);
            }
            this.writeClient.close();
        }
    }
}
