/*
 * Decompiled with CFR 0.152.
 */
package com.efficient.elasticsearch.flink;

import cn.hutool.core.bean.BeanUtil;
import com.efficient.common.util.JackSonUtil;
import com.efficient.elasticsearch.properties.ElasticSearchProperties;
import com.efficient.elasticsearch.properties.FlinkProperties;
import com.efficient.elasticsearch.service.ElasticSearchService;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchSink
extends RichSinkFunction<String> {
    private static final Logger log = LoggerFactory.getLogger(ElasticsearchSink.class);
    private final ElasticSearchProperties elasticSearchProperties;
    private final String indexName;
    private final FlinkProperties flinkProperties;
    private ElasticSearchService elasticSearchService;
    private RestHighLevelClient client;
    private transient BulkProcessor bulkProcessor;

    public ElasticsearchSink(ElasticSearchProperties elasticSearchProperties, String indexName) {
        this.elasticSearchProperties = (ElasticSearchProperties)BeanUtil.copyProperties((Object)elasticSearchProperties, ElasticSearchProperties.class, (String[])new String[0]);
        this.indexName = indexName;
        this.flinkProperties = new FlinkProperties();
    }

    public ElasticsearchSink(ElasticSearchProperties elasticSearchProperties, FlinkProperties flinkProperties, String indexName) {
        this.elasticSearchProperties = (ElasticSearchProperties)BeanUtil.copyProperties((Object)elasticSearchProperties, ElasticSearchProperties.class, (String[])new String[0]);
        this.indexName = indexName;
        this.flinkProperties = flinkProperties;
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.elasticSearchService = new ElasticSearchService();
        this.elasticSearchService.init(this.elasticSearchProperties);
        this.client = this.elasticSearchService.restHighLevelClient;
        BulkProcessor.Listener listener = new BulkProcessor.Listener(){

            public void beforeBulk(long executionId, BulkRequest request) {
                log.info("Executing bulk [{}] with {} requests", (Object)executionId, (Object)request.numberOfActions());
            }

            public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
                if (response.hasFailures()) {
                    log.error("Bulk [{}] executed with failures", (Object)executionId);
                } else {
                    log.info("Bulk [{}] completed in {} milliseconds", (Object)executionId, (Object)response.getTook().getMillis());
                }
            }

            public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
                log.error("Failed to execute bulk", failure);
            }
        };
        BulkProcessor.Builder bulkProcessorBuilder = BulkProcessor.builder((request, bulkListener) -> {
            this.client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
            request.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
        }, (BulkProcessor.Listener)listener);
        bulkProcessorBuilder.setBulkActions(this.flinkProperties.getBulkActions().intValue()).setBulkSize(new ByteSizeValue((long)this.flinkProperties.getBulkSize().intValue(), ByteSizeUnit.MB)).setFlushInterval(TimeValue.timeValueSeconds((long)this.flinkProperties.getFlushInterval().intValue())).setConcurrentRequests(this.flinkProperties.getConcurrentRequests().intValue()).setBackoffPolicy(BackoffPolicy.exponentialBackoff((TimeValue)TimeValue.timeValueMillis((long)this.flinkProperties.getTryWaitTime().intValue()), (int)this.flinkProperties.getTryCount()));
        this.bulkProcessor = bulkProcessorBuilder.build();
    }

    public void close() throws Exception {
        super.close();
        if (this.bulkProcessor != null) {
            this.bulkProcessor.flush();
            this.bulkProcessor.awaitClose(1L, TimeUnit.MINUTES);
        }
        if (this.elasticSearchService != null) {
            this.elasticSearchService.destroy();
        }
    }

    public void invoke(String data, SinkFunction.Context context) throws Exception {
        Map document = JackSonUtil.toMap((String)data);
        if (Objects.isNull(document)) {
            return;
        }
        String pkFieldValue = String.valueOf(document.get(this.elasticSearchProperties.getPkFieldName()));
        if (pkFieldValue == null) {
            log.error("Document missing primary key field.");
            return;
        }
        IndexRequest indexRequest = new IndexRequest(this.indexName).id(pkFieldValue).source(document);
        this.bulkProcessor.add(indexRequest);
    }
}

