package com.github.dapeng.message.serializer;

import com.github.dapeng.core.BeanSerializer;
import com.github.dapeng.message.config.MessageInfo;
import com.github.dapeng.org.apache.thrift.TException;
import com.github.dapeng.org.apache.thrift.protocol.TCompactProtocol;
import com.github.dapeng.util.TCommonTransport;
import com.github.dapeng.util.TKafkaTransport;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/dapeng/message/serializer/KafkaMessageProcessor.class */
public class KafkaMessageProcessor<T> {
    private Logger LOGGER = LoggerFactory.getLogger(KafkaMessageProcessor.class);
    private BeanSerializer<T> beanSerializer;
    private byte[] realMessage;

    public T dealMessage(byte[] bArr, ClassLoader classLoader) throws TException {
        String eventType = getEventType(bArr);
        this.LOGGER.info("fetch eventType: {}", eventType);
        this.beanSerializer = assemblyBeanSerializer(eventType, classLoader);
        T event = parseMessage(bArr).getEvent();
        this.LOGGER.info("dealMessage:event {}", event.toString());
        return event;
    }

    public T decodeMessage(ByteBuffer byteBuffer, BeanSerializer beanSerializer) throws TException {
        this.LOGGER.info("fetch event body: ");
        T t = (T) beanSerializer.read(new TCompactProtocol(new TKafkaTransport(new byte[byteBuffer.remaining()], TCommonTransport.Type.Read)));
        this.LOGGER.info("dealMessage:event {}", t.toString());
        return t;
    }

    private MessageInfo<T> parseMessage(byte[] bArr) throws TException {
        TKafkaTransport tKafkaTransport = new TKafkaTransport(bArr, TCommonTransport.Type.Read);
        return new MessageInfo<>(tKafkaTransport.getEventType(), this.beanSerializer.read(new TCompactProtocol(tKafkaTransport)));
    }

    public byte[] buildMessageByte(T t) throws TException {
        String name = t.getClass().getName();
        this.beanSerializer = assemblyBeanSerializer(name);
        TKafkaTransport tKafkaTransport = new TKafkaTransport(new byte[8192], TCommonTransport.Type.Write);
        TCompactProtocol tCompactProtocol = new TCompactProtocol(tKafkaTransport);
        tKafkaTransport.setEventType(name);
        this.beanSerializer.write(t, tCompactProtocol);
        tKafkaTransport.flush();
        return tKafkaTransport.getByteBuf();
    }

    public String getEventType(byte[] bArr) {
        int i = 0;
        while (i < bArr.length) {
            int i2 = i;
            i++;
            if (bArr[i2] == 0) {
                break;
            }
        }
        byte[] bArr2 = new byte[i - 1];
        System.arraycopy(bArr, 0, bArr2, 0, i - 1);
        this.realMessage = new byte[bArr.length - i];
        System.arraycopy(bArr, i, this.realMessage, 0, bArr.length - i);
        return new String(bArr2);
    }

    public byte[] getEventBinary() {
        return this.realMessage;
    }

    private BeanSerializer assemblyBeanSerializer(String str, ClassLoader classLoader) {
        String str2 = null;
        try {
            str2 = str.substring(0, str.lastIndexOf(".")) + ".serializer." + str.substring(str.lastIndexOf(".") + 1) + "Serializer";
            return (BeanSerializer) classLoader.loadClass(str2).newInstance();
        } catch (ClassNotFoundException e) {
            this.LOGGER.error("找不到对应的消息解码器 {}", str2);
            this.LOGGER.error(e.getMessage(), e);
            return null;
        } catch (IllegalAccessException e2) {
            e2.printStackTrace();
            return null;
        } catch (InstantiationException e3) {
            e3.printStackTrace();
            return null;
        } catch (StringIndexOutOfBoundsException e4) {
            this.LOGGER.error("组装权限定名出错!!");
            this.LOGGER.error(e4.getMessage(), e4);
            return null;
        }
    }

    private BeanSerializer assemblyBeanSerializer(String str) {
        String str2 = null;
        try {
            str2 = str.substring(0, str.lastIndexOf(".")) + ".serializer." + str.substring(str.lastIndexOf(".") + 1) + "Serializer";
            return (BeanSerializer) getClass().getClassLoader().loadClass(str2).newInstance();
        } catch (ClassNotFoundException e) {
            this.LOGGER.error("找不到对应的消息解码器 {}", str2);
            this.LOGGER.error(e.getMessage(), e);
            return null;
        } catch (IllegalAccessException e2) {
            e2.printStackTrace();
            return null;
        } catch (InstantiationException e3) {
            e3.printStackTrace();
            return null;
        } catch (StringIndexOutOfBoundsException e4) {
            this.LOGGER.error("组装权限定名出错!!");
            this.LOGGER.error(e4.getMessage(), e4);
            return null;
        }
    }
}
