package herddb.cdc;

import herddb.client.ClientConfiguration;
import herddb.cluster.BookkeeperCommitLog;
import herddb.cluster.BookkeeperCommitLogManager;
import herddb.cluster.ZookeeperMetadataStorageManager;
import herddb.codec.DataAccessorForFullRecord;
import herddb.log.CommitLog;
import herddb.log.LogEntry;
import herddb.log.LogSequenceNumber;
import herddb.metadata.MetadataStorageManagerException;
import herddb.model.Record;
import herddb.model.Table;
import herddb.server.ServerConfiguration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.bookkeeper.stats.NullStatsLogger;

/* loaded from: input_file:herddb/cdc/ChangeDataCapture.class */
public class ChangeDataCapture implements AutoCloseable {
    private final ClientConfiguration configuration;
    private final MutationListener listener;
    private final TableSchemaHistoryStorage tableSchemaHistoryStorage;
    private LogSequenceNumber lastPosition;
    private final String tableSpaceUUID;
    private ZookeeperMetadataStorageManager zookeeperMetadataStorageManager;
    private BookkeeperCommitLogManager manager;
    private volatile boolean closed = false;
    private volatile boolean running = false;
    private Map<Long, TransactionHolder> transactions = new HashMap();

    /* loaded from: input_file:herddb/cdc/ChangeDataCapture$Mutation.class */
    public static class Mutation {
        private final Table table;
        private final MutationType mutationType;
        private final DataAccessorForFullRecord record;
        private final LogSequenceNumber logSequenceNumber;
        private final long timestamp;

        public Mutation(Table table, MutationType mutationType, DataAccessorForFullRecord dataAccessorForFullRecord, LogSequenceNumber logSequenceNumber, long j) {
            this.table = table;
            this.mutationType = mutationType;
            this.record = dataAccessorForFullRecord;
            this.logSequenceNumber = logSequenceNumber;
            this.timestamp = j;
        }

        public Table getTable() {
            return this.table;
        }

        public MutationType getMutationType() {
            return this.mutationType;
        }

        public DataAccessorForFullRecord getRecord() {
            return this.record;
        }

        public LogSequenceNumber getLogSequenceNumber() {
            return this.logSequenceNumber;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public String toString() {
            return "Mutation{table=" + this.table + ", mutationType=" + this.mutationType + ", record=" + this.record + ", logSequenceNumber=" + this.logSequenceNumber + ", timestamp=" + this.timestamp + '}';
        }
    }

    /* loaded from: input_file:herddb/cdc/ChangeDataCapture$MutationListener.class */
    public interface MutationListener {
        void accept(Mutation mutation);
    }

    /* loaded from: input_file:herddb/cdc/ChangeDataCapture$MutationType.class */
    public enum MutationType {
        INSERT,
        UPDATE,
        DELETE,
        CREATE_TABLE,
        DROP_TABLE,
        ALTER_TABLE
    }

    /* loaded from: input_file:herddb/cdc/ChangeDataCapture$TableSchemaHistoryStorage.class */
    public interface TableSchemaHistoryStorage {
        void storeSchema(LogSequenceNumber logSequenceNumber, Table table);

        Table fetchSchema(LogSequenceNumber logSequenceNumber, String str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:herddb/cdc/ChangeDataCapture$TransactionHolder.class */
    public static class TransactionHolder {
        private List<Mutation> mutations;
        private Map<String, Table> tablesDefinitions;

        private TransactionHolder() {
            this.mutations = new ArrayList();
            this.tablesDefinitions = new HashMap();
        }
    }

    public ChangeDataCapture(String str, ClientConfiguration clientConfiguration, MutationListener mutationListener, LogSequenceNumber logSequenceNumber, TableSchemaHistoryStorage tableSchemaHistoryStorage) {
        this.configuration = clientConfiguration;
        this.listener = mutationListener;
        this.lastPosition = logSequenceNumber;
        this.tableSpaceUUID = str;
        this.tableSchemaHistoryStorage = tableSchemaHistoryStorage;
    }

    public void start() throws Exception {
        this.zookeeperMetadataStorageManager = buildMetadataStorageManager(this.configuration);
        this.manager = new BookkeeperCommitLogManager(this.zookeeperMetadataStorageManager, new ServerConfiguration(), NullStatsLogger.INSTANCE);
        this.manager.start();
    }

    public LogSequenceNumber run() throws Exception {
        if (this.zookeeperMetadataStorageManager == null) {
            throw new IllegalStateException("not started");
        }
        try {
            BookkeeperCommitLog createCommitLog = this.manager.createCommitLog(this.tableSpaceUUID, this.tableSpaceUUID, "cdc");
            try {
                this.running = true;
                createCommitLog.followTheLeader(this.lastPosition, new CommitLog.EntryAcceptor() { // from class: herddb.cdc.ChangeDataCapture.1
                    @Override // herddb.log.CommitLog.EntryAcceptor
                    public boolean accept(LogSequenceNumber logSequenceNumber, LogEntry logEntry) throws Exception {
                        ChangeDataCapture.this.applyEntry(logEntry, logSequenceNumber);
                        ChangeDataCapture.this.lastPosition = logSequenceNumber;
                        return !ChangeDataCapture.this.closed;
                    }
                }, createCommitLog.startFollowing(this.lastPosition));
                LogSequenceNumber logSequenceNumber = this.lastPosition;
                if (createCommitLog != null) {
                    createCommitLog.close();
                }
                return logSequenceNumber;
            } finally {
            }
        } finally {
            this.running = false;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.closed = true;
        long currentTimeMillis = System.currentTimeMillis();
        while (this.running && System.currentTimeMillis() - currentTimeMillis < 10000) {
            Thread.sleep(100L);
        }
        if (this.manager != null) {
            this.manager.close();
        }
        if (this.zookeeperMetadataStorageManager != null) {
            this.zookeeperMetadataStorageManager.close();
        }
    }

    private void fire(Mutation mutation, long j) {
        if (j > 0) {
            this.transactions.get(Long.valueOf(j)).mutations.add(mutation);
        } else {
            this.listener.accept(mutation);
        }
    }

    private Table lookupTable(LogSequenceNumber logSequenceNumber, LogEntry logEntry) {
        Table table;
        String str = logEntry.tableName;
        return (logEntry.transactionId <= 0 || (table = (Table) this.transactions.get(Long.valueOf(logEntry.transactionId)).tablesDefinitions.get(str)) == null) ? this.tableSchemaHistoryStorage.fetchSchema(logSequenceNumber, str) : table;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void applyEntry(LogEntry logEntry, LogSequenceNumber logSequenceNumber) throws Exception {
        switch (logEntry.type) {
            case 1:
                Table deserialize = Table.deserialize(logEntry.value.to_array());
                if (logEntry.transactionId > 0) {
                    this.transactions.get(Long.valueOf(logEntry.transactionId)).tablesDefinitions.put(logEntry.tableName, deserialize);
                } else {
                    this.tableSchemaHistoryStorage.storeSchema(logSequenceNumber, deserialize);
                }
                fire(new Mutation(deserialize, MutationType.CREATE_TABLE, null, logSequenceNumber, logEntry.timestamp), logEntry.transactionId);
                return;
            case 2:
                Table lookupTable = lookupTable(logSequenceNumber, logEntry);
                fire(new Mutation(lookupTable, MutationType.INSERT, new DataAccessorForFullRecord(lookupTable, new Record(logEntry.key, logEntry.value)), logSequenceNumber, logEntry.timestamp), logEntry.transactionId);
                return;
            case 3:
                Table lookupTable2 = lookupTable(logSequenceNumber, logEntry);
                fire(new Mutation(lookupTable2, MutationType.UPDATE, new DataAccessorForFullRecord(lookupTable2, new Record(logEntry.key, logEntry.value)), logSequenceNumber, logEntry.timestamp), logEntry.transactionId);
                return;
            case 4:
                Table lookupTable3 = lookupTable(logSequenceNumber, logEntry);
                fire(new Mutation(lookupTable3, MutationType.DELETE, new DataAccessorForFullRecord(lookupTable3, new Record(logEntry.key, logEntry.value)), logSequenceNumber, logEntry.timestamp), logEntry.transactionId);
                return;
            case 5:
                this.transactions.put(Long.valueOf(logEntry.transactionId), new TransactionHolder());
                return;
            case 6:
                TransactionHolder remove = this.transactions.remove(Long.valueOf(logEntry.transactionId));
                remove.tablesDefinitions.forEach((str, table) -> {
                    if (table == null) {
                        return;
                    }
                    this.tableSchemaHistoryStorage.storeSchema(logSequenceNumber, table);
                });
                Iterator it = remove.mutations.iterator();
                while (it.hasNext()) {
                    this.listener.accept((Mutation) it.next());
                }
                return;
            case 7:
                this.transactions.remove(Long.valueOf(logEntry.transactionId));
                return;
            case 8:
                Table deserialize2 = Table.deserialize(logEntry.value.to_array());
                if (logEntry.transactionId > 0) {
                    this.transactions.get(Long.valueOf(logEntry.transactionId)).tablesDefinitions.put(logEntry.tableName, deserialize2);
                } else {
                    this.tableSchemaHistoryStorage.storeSchema(logSequenceNumber, deserialize2);
                }
                fire(new Mutation(deserialize2, MutationType.ALTER_TABLE, null, logSequenceNumber, logEntry.timestamp), logEntry.transactionId);
                return;
            case 9:
                Table lookupTable4 = lookupTable(logSequenceNumber, logEntry);
                if (logEntry.transactionId > 0) {
                    this.transactions.get(Long.valueOf(logEntry.transactionId)).tablesDefinitions.put(logEntry.tableName, null);
                }
                fire(new Mutation(lookupTable4, MutationType.DROP_TABLE, null, logSequenceNumber, logEntry.timestamp), logEntry.transactionId);
                return;
            case 10:
            case 11:
            case 12:
            case 13:
            default:
                return;
        }
    }

    private static ZookeeperMetadataStorageManager buildMetadataStorageManager(ClientConfiguration clientConfiguration) throws MetadataStorageManagerException {
        ZookeeperMetadataStorageManager zookeeperMetadataStorageManager = new ZookeeperMetadataStorageManager(clientConfiguration.getString(ClientConfiguration.PROPERTY_ZOOKEEPER_ADDRESS, "localhost:1281"), clientConfiguration.getInt(ClientConfiguration.PROPERTY_ZOOKEEPER_SESSIONTIMEOUT, 60000), clientConfiguration.getString(ClientConfiguration.PROPERTY_ZOOKEEPER_PATH, "/herd"));
        zookeeperMetadataStorageManager.start(false);
        return zookeeperMetadataStorageManager;
    }
}
