package com.datawizards.kafka.streams;

import com.datawizards.kafka.streams.app.KafkaStreamsApplicationBase;
import com.datawizards.model.DeviceUsage;
import com.datawizards.model.UserAction;
import com.datawizards.model.UserProfile;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.streams.kstream.KStreamBuilder;

/* loaded from: input_file:com/datawizards/kafka/streams/ApplicationMain.class */
public class ApplicationMain extends KafkaStreamsApplicationBase {
    private static final String INPUT_TOPIC = "user-actions-example";
    private static final String OUTPUT_TOPIC = "user-profile-example-v2";

    public static void main(String[] strArr) throws ExecutionException, InterruptedException {
        new ApplicationMain().run();
    }

    protected void buildTopology(KStreamBuilder kStreamBuilder) {
        SpecificAvroSerde specificAvroSerde = new SpecificAvroSerde();
        specificAvroSerde.configure(Collections.singletonMap("schema.registry.url", "http://localhost:8081"), false);
        kStreamBuilder.stream(new String[]{INPUT_TOPIC}).groupByKey().aggregate(this::emptyProfile, this::aggregateProfile, specificAvroSerde).to(OUTPUT_TOPIC);
    }

    private UserProfile emptyProfile() {
        return UserProfile.newBuilder().setUserId("???").m7build();
    }

    private UserProfile aggregateProfile(String str, UserAction userAction, UserProfile userProfile) {
        List<DeviceUsage> calculateDeviceUsage = calculateDeviceUsage(userAction, userProfile.getDeviceUsage());
        return UserProfile.newBuilder().setUserId(str).setActionsCount(userProfile.getActionsCount().longValue() + 1).setLastAction(userAction.getEventDate()).setDeviceUsage(calculateDeviceUsage).setFavouriteDevice(calculateFavouriteDevice(calculateDeviceUsage)).m7build();
    }

    protected List<DeviceUsage> calculateDeviceUsage(UserAction userAction, List<DeviceUsage> list) {
        if (!list.stream().allMatch(deviceUsage -> {
            return !deviceUsage.getDevice().equals(userAction.getDevice());
        })) {
            return (List) list.stream().map(deviceUsage2 -> {
                if (deviceUsage2.getDevice().equals(userAction.getDevice())) {
                    deviceUsage2.setActionsCount(Long.valueOf(deviceUsage2.getActionsCount().longValue() + 1));
                }
                return deviceUsage2;
            }).collect(Collectors.toList());
        }
        if (userAction.getDevice() != null) {
            list.add(DeviceUsage.newBuilder().setDevice(userAction.getDevice()).setActionsCount(1L).m1build());
        }
        return list;
    }

    protected CharSequence calculateFavouriteDevice(List<DeviceUsage> list) {
        if (list.isEmpty()) {
            return null;
        }
        return list.stream().max(Comparator.comparing((v0) -> {
            return v0.getActionsCount();
        })).get().getDevice();
    }
}
