/*
 * Decompiled with CFR 0.152.
 */
package org.zalando.nakadiproducer.transmission.impl;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.transaction.Transactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.zalando.fahrschein.EventPublishingException;
import org.zalando.fahrschein.domain.BatchItemResponse;
import org.zalando.nakadiproducer.eventlog.impl.EventLog;
import org.zalando.nakadiproducer.eventlog.impl.EventLogRepository;
import org.zalando.nakadiproducer.transmission.NakadiPublishingClient;
import org.zalando.nakadiproducer.transmission.impl.EventBatcher;
import org.zalando.nakadiproducer.transmission.impl.NakadiEvent;
import org.zalando.nakadiproducer.transmission.impl.NakadiMetadata;

public class EventTransmissionService {
    private static final Logger log = LoggerFactory.getLogger(EventTransmissionService.class);
    private final EventLogRepository eventLogRepository;
    private final NakadiPublishingClient nakadiPublishingClient;
    private final ObjectMapper objectMapper;
    private Clock clock = Clock.systemDefaultZone();

    public EventTransmissionService(EventLogRepository eventLogRepository, NakadiPublishingClient nakadiPublishingClient, ObjectMapper objectMapper) {
        this.eventLogRepository = eventLogRepository;
        this.nakadiPublishingClient = nakadiPublishingClient;
        this.objectMapper = objectMapper;
    }

    @Transactional
    public Collection<EventLog> lockSomeEvents() {
        String lockId = UUID.randomUUID().toString();
        log.debug("Locking events for replication with lockId {}", (Object)lockId);
        this.eventLogRepository.lockSomeMessages(lockId, this.now(), this.now().plus(10L, ChronoUnit.MINUTES));
        return this.eventLogRepository.findByLockedByAndLockedUntilGreaterThan(lockId, this.now());
    }

    @Transactional
    public void sendEvents(Collection<EventLog> events) {
        EventBatcher batcher = new EventBatcher(this.objectMapper, this::publishBatch);
        for (EventLog event : events) {
            NakadiEvent nakadiEvent;
            if (this.lockNearlyExpired(event)) continue;
            try {
                nakadiEvent = this.mapToNakadiEvent(event);
            }
            catch (Exception e) {
                log.error("Could not serialize event {} of type {}, skipping it.", new Object[]{event.getId(), event.getEventType(), e});
                continue;
            }
            batcher.pushEvent(event, nakadiEvent);
        }
        batcher.finish();
    }

    private void publishBatch(List<EventBatcher.BatchItem> batch) {
        try {
            this.tryToPublishBatch(batch);
        }
        catch (Exception e) {
            log.error("Could not send {} events of type {}, skipping them.", new Object[]{batch.size(), batch.get(0).getEventLogEntry().getEventType(), e});
        }
    }

    private void tryToPublishBatch(List<EventBatcher.BatchItem> batch) throws Exception {
        Stream<EventLog> successfulEvents;
        String eventType = batch.get(0).getEventLogEntry().getEventType();
        try {
            this.nakadiPublishingClient.publish(eventType, batch.stream().map(EventBatcher.BatchItem::getNakadiEvent).collect(Collectors.toList()));
            successfulEvents = batch.stream().map(EventBatcher.BatchItem::getEventLogEntry);
            log.info("Sent {} events of type {}.", (Object)batch.size(), (Object)eventType);
        }
        catch (EventPublishingException e) {
            log.error("{} out of {} events of type {} failed to be sent.", new Object[]{e.getResponses().length, batch.size(), eventType});
            List<String> failedEids = this.collectEids(e);
            successfulEvents = batch.stream().map(EventBatcher.BatchItem::getEventLogEntry).filter(rawEvent -> !failedEids.contains(this.convertToUUID(rawEvent.getId())));
        }
        successfulEvents.forEach(this.eventLogRepository::delete);
    }

    private List<String> collectEids(EventPublishingException e) {
        return Arrays.stream(e.getResponses()).map(BatchItemResponse::getEid).collect(Collectors.toList());
    }

    private boolean lockNearlyExpired(EventLog eventLog) {
        return this.now().isAfter(eventLog.getLockedUntil().minus(1L, ChronoUnit.MINUTES));
    }

    private NakadiEvent mapToNakadiEvent(EventLog event) throws IOException {
        NakadiEvent nakadiEvent = new NakadiEvent();
        NakadiMetadata metadata = new NakadiMetadata();
        metadata.setEid(this.convertToUUID(event.getId()));
        metadata.setOccuredAt(event.getCreated());
        metadata.setFlowId(event.getFlowId());
        nakadiEvent.setMetadata(metadata);
        LinkedHashMap payloadDTO = (LinkedHashMap)this.objectMapper.readValue(event.getEventBodyData(), (TypeReference)new TypeReference<LinkedHashMap<String, Object>>(){});
        nakadiEvent.setData(payloadDTO);
        return nakadiEvent;
    }

    private Instant now() {
        return this.clock.instant();
    }

    public void overrideClock(Clock clock) {
        this.clock = clock;
    }

    private String convertToUUID(int number) {
        return new UUID(0L, number).toString();
    }
}

