/*
 * Decompiled with CFR 0.152.
 */
package org.teamapps.universaldb;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.teamapps.cluster.model.cluster.AbstractDbLeader;
import org.teamapps.cluster.model.cluster.DbLeaderClient;
import org.teamapps.cluster.model.cluster.DbTransactionList;
import org.teamapps.cluster.model.cluster.DbTransactionListRequest;
import org.teamapps.cluster.network.NodeAddress;
import org.teamapps.cluster.service.ClusterTopic;
import org.teamapps.cluster.service.TeamAppsCluster;
import org.teamapps.protocol.service.ServiceRegistry;
import org.teamapps.universaldb.index.ColumnIndex;
import org.teamapps.universaldb.index.DataBaseMapper;
import org.teamapps.universaldb.index.DatabaseIndex;
import org.teamapps.universaldb.index.IndexType;
import org.teamapps.universaldb.index.SchemaIndex;
import org.teamapps.universaldb.index.TableIndex;
import org.teamapps.universaldb.index.file.FileIndex;
import org.teamapps.universaldb.index.file.FileStore;
import org.teamapps.universaldb.index.file.FileValue;
import org.teamapps.universaldb.index.file.LocalFileStore;
import org.teamapps.universaldb.index.log.LogIterator;
import org.teamapps.universaldb.index.reference.CyclicReferenceUpdate;
import org.teamapps.universaldb.index.reference.multi.MultiReferenceIndex;
import org.teamapps.universaldb.index.reference.single.SingleReferenceIndex;
import org.teamapps.universaldb.index.reference.value.MultiReferenceEditValue;
import org.teamapps.universaldb.index.reference.value.RecordReference;
import org.teamapps.universaldb.index.reference.value.ResolvedMultiReferenceUpdate;
import org.teamapps.universaldb.index.text.FullTextIndexValue;
import org.teamapps.universaldb.index.transaction.TransactionIndex;
import org.teamapps.universaldb.index.transaction.TransactionType;
import org.teamapps.universaldb.index.transaction.request.TransactionRequest;
import org.teamapps.universaldb.index.transaction.request.TransactionRequestRecord;
import org.teamapps.universaldb.index.transaction.request.TransactionRequestRecordType;
import org.teamapps.universaldb.index.transaction.request.TransactionRequestRecordValue;
import org.teamapps.universaldb.index.transaction.resolved.ResolvedTransaction;
import org.teamapps.universaldb.index.transaction.resolved.ResolvedTransactionRecord;
import org.teamapps.universaldb.index.transaction.resolved.ResolvedTransactionRecordType;
import org.teamapps.universaldb.index.transaction.resolved.ResolvedTransactionRecordValue;
import org.teamapps.universaldb.index.translation.TranslatableText;
import org.teamapps.universaldb.pojo.AbstractUdbEntity;
import org.teamapps.universaldb.schema.Database;
import org.teamapps.universaldb.schema.Schema;
import org.teamapps.universaldb.schema.SchemaInfoProvider;
import org.teamapps.universaldb.schema.Table;
import org.teamapps.universaldb.update.RecordUpdateEvent;
import reactor.core.publisher.Mono;

public class UniversalDB
implements DataBaseMapper {
    public static final Marker SKIP_DB_LOGGING = MarkerFactory.getMarker((String)"SKIP_DB_LOGGING");
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final ThreadLocal<Integer> THREAD_LOCAL_USER_ID = ThreadLocal.withInitial(() -> 0);
    private final File storagePath;
    private final SchemaIndex schemaIndex;
    private TransactionIndex transactionIndex;
    private final Map<Integer, DatabaseIndex> databaseById = new HashMap<Integer, DatabaseIndex>();
    private final Map<Integer, TableIndex> tableById = new HashMap<Integer, TableIndex>();
    private final Map<Integer, ColumnIndex> columnById = new HashMap<Integer, ColumnIndex>();
    private final Map<TableIndex, Class> entityClassByTableIndex = new HashMap<TableIndex, Class>();
    private final Map<TableIndex, Class> queryClassByTableIndex = new HashMap<TableIndex, Class>();
    private final Map<String, TableIndex> tableIndexByPath = new HashMap<String, TableIndex>();
    private final ArrayBlockingQueue<RecordUpdateEvent> updateEventQueue = new ArrayBlockingQueue(25000);
    private TeamAppsCluster cluster;
    private DbLeaderClient dbLeaderClient;
    private ClusterTopic clusterClientTopic;
    private final Map<Long, CompletableFuture<ResolvedTransaction>> transactionCompletableFutureMap = new ConcurrentHashMap<Long, CompletableFuture<ResolvedTransaction>>();
    private ClusterTopic leaderTransactionClusterMessageQueue;
    private volatile boolean active = true;

    public static int getUserId() {
        return THREAD_LOCAL_USER_ID.get();
    }

    public static void setUserId(int userId) {
        THREAD_LOCAL_USER_ID.set(userId);
    }

    public static UniversalDB createStandalone(File storagePath, SchemaInfoProvider schemaInfoProvider) throws Exception {
        LocalFileStore fileStore = new LocalFileStore(new File(storagePath, "file-store"));
        return new UniversalDB(storagePath, schemaInfoProvider, fileStore);
    }

    public static UniversalDB createStandalone(File storagePath, File fileStorePath, SchemaInfoProvider schemaInfoProvider) throws Exception {
        LocalFileStore fileStore = new LocalFileStore(fileStorePath);
        return new UniversalDB(storagePath, schemaInfoProvider, fileStore);
    }

    public static UniversalDB createStandalone(File storagePath, SchemaInfoProvider schemaInfoProvider, FileStore fileStore) throws Exception {
        return new UniversalDB(storagePath, schemaInfoProvider, fileStore);
    }

    private UniversalDB(File storagePath, SchemaInfoProvider schemaInfo, FileStore fileStore) throws Exception {
        AbstractUdbEntity.setDatabase(this);
        this.storagePath = storagePath;
        this.transactionIndex = new TransactionIndex(storagePath);
        this.createShutdownHook();
        Schema schema = schemaInfo.getSchema();
        this.schemaIndex = new SchemaIndex(schema, storagePath);
        this.schemaIndex.setFileStore(fileStore);
        this.mapSchema(schema, true);
        this.installTableClasses(this.transactionIndex.getCurrentSchema(), UniversalDB.class.getClassLoader(), false);
    }

    public UniversalDB(File storagePath, SchemaInfoProvider schemaInfo, String clusterSecret, int port) throws Exception {
        LocalFileStore fileStore = new LocalFileStore(new File(storagePath, "file-store"));
        AbstractUdbEntity.setDatabase(this);
        this.storagePath = storagePath;
        this.transactionIndex = new TransactionIndex(storagePath);
        this.createShutdownHook();
        Schema schema = schemaInfo.getSchema();
        this.schemaIndex = new SchemaIndex(schema, storagePath);
        this.schemaIndex.setFileStore(fileStore);
        this.mapSchema(schema, true);
        this.installTableClasses(this.transactionIndex.getCurrentSchema(), UniversalDB.class.getClassLoader(), false);
        this.cluster = new TeamAppsCluster(clusterSecret, "" + this.transactionIndex.getNodeId(), null, port, new NodeAddress[0]);
        this.leaderTransactionClusterMessageQueue = this.cluster.createTopic("udb-resolved-transactions", null);
        this.cluster.createTopic("udb-transaction-requests", clusterTopicMessage -> {
            try {
                TransactionRequest request = new TransactionRequest(clusterTopicMessage.getData());
                this.executeTransaction(request);
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        });
        new AbstractDbLeader((ServiceRegistry)this.cluster){

            public DbTransactionList requestMissingTransactions(DbTransactionListRequest request) {
                try {
                    DbTransactionList dbTransactionList = new DbTransactionList();
                    long lastKnownTransactionId = request.getLastKnownTransactionId();
                    logger.info(SKIP_DB_LOGGING, "Client requested transactions with last known id: {}", (Object)lastKnownTransactionId);
                    LogIterator logIterator = UniversalDB.this.transactionIndex.getLogIterator();
                    File transactionsFile = File.createTempFile("transactions", ".temp");
                    DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(transactionsFile, false)));
                    int counter = 0;
                    while (logIterator.hasNext()) {
                        byte[] bytes = logIterator.next();
                        ResolvedTransaction transaction = ResolvedTransaction.createResolvedTransaction(bytes);
                        if (transaction.getTransactionId() <= lastKnownTransactionId) continue;
                        dos.writeInt(bytes.length);
                        dos.write(bytes);
                        ++counter;
                    }
                    dos.close();
                    dbTransactionList.setLastKnownTransactionId(lastKnownTransactionId);
                    dbTransactionList.setTransactionsFile(transactionsFile);
                    dbTransactionList.setTransactionCount((long)counter);
                    logger.info(SKIP_DB_LOGGING, "Send transactions file, size: {}", (Object)transactionsFile.length());
                    logIterator.close();
                    return dbTransactionList;
                }
                catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
            }
        };
    }

    public UniversalDB(File storagePath, SchemaInfoProvider schemaInfo, String clusterSecret, int port, NodeAddress leaderNode) throws Exception {
        Schema schema;
        LocalFileStore fileStore = new LocalFileStore(new File(storagePath, "file-store"));
        AbstractUdbEntity.setDatabase(this);
        this.storagePath = storagePath;
        this.transactionIndex = new TransactionIndex(storagePath);
        this.createShutdownHook();
        boolean emptyTransactionIndex = this.transactionIndex.isEmpty();
        if (!emptyTransactionIndex) {
            schema = schemaInfo.getSchema();
            this.schemaIndex = new SchemaIndex(schema, storagePath);
            this.schemaIndex.setFileStore(fileStore);
            this.mapSchema(schema, false);
            this.installTableClasses(this.transactionIndex.getCurrentSchema(), UniversalDB.class.getClassLoader(), false);
        } else {
            schema = new Schema();
            this.schemaIndex = new SchemaIndex(schema, storagePath);
            this.schemaIndex.setFileStore(fileStore);
        }
        this.cluster = new TeamAppsCluster(clusterSecret, "" + this.transactionIndex.getNodeId(), null, port, new NodeAddress[]{leaderNode});
        this.clusterClientTopic = this.cluster.createTopic("udb-transaction-requests", null);
        ArrayBlockingQueue transactionQueue = new ArrayBlockingQueue(10000);
        AtomicBoolean syncingFinished = new AtomicBoolean(false);
        this.cluster.createTopic("udb-resolved-transactions", clusterTopicMessage -> {
            try {
                if (!this.active) {
                    return;
                }
                ResolvedTransaction transaction = ResolvedTransaction.createResolvedTransaction(clusterTopicMessage.getData());
                if (syncingFinished.get()) {
                    if (!transactionQueue.isEmpty()) {
                        while (!transactionQueue.isEmpty()) {
                            logger.info(SKIP_DB_LOGGING, "Run queued transactions: {}", (Object)transactionQueue.size());
                            ResolvedTransaction resolvedTransaction = (ResolvedTransaction)transactionQueue.take();
                            if (this.transactionIndex.getLastTransactionId() >= resolvedTransaction.getTransactionId()) {
                                logger.info(SKIP_DB_LOGGING, "Transaction already processed: {}, last known transaction id: {}", (Object)resolvedTransaction.getTransactionId(), (Object)this.transactionIndex.getLastTransactionId());
                                continue;
                            }
                            this.handleTransaction(resolvedTransaction);
                            CompletableFuture<ResolvedTransaction> completableFuture = this.transactionCompletableFutureMap.get(resolvedTransaction.getRequestId());
                            if (completableFuture == null) continue;
                            completableFuture.complete(resolvedTransaction);
                        }
                    }
                    this.handleTransaction(transaction);
                    CompletableFuture<ResolvedTransaction> completableFuture = this.transactionCompletableFutureMap.get(transaction.getRequestId());
                    if (completableFuture != null) {
                        completableFuture.complete(transaction);
                    }
                } else {
                    transactionQueue.offer(transaction);
                }
            }
            catch (Exception e) {
                throw new RuntimeException("Error syncing transactions", e);
            }
        });
        this.dbLeaderClient = new DbLeaderClient((ServiceRegistry)this.cluster);
        while (!this.dbLeaderClient.isAvailable()) {
            logger.info(SKIP_DB_LOGGING, "Wait for db leader service...");
            Thread.sleep(1000L);
        }
        logger.info(SKIP_DB_LOGGING, "Wait for logs...");
        Thread.sleep(3000L);
        Mono transactionListMono = this.dbLeaderClient.requestMissingTransactions(new DbTransactionListRequest().setLastKnownTransactionId(this.transactionIndex.getLastTransactionId()));
        DbTransactionList transactionList = (DbTransactionList)transactionListMono.block();
        if (transactionList.getTransactionCount() > 0L) {
            File transactionsFile = transactionList.getTransactionsFile();
            DataInputStream dis = new DataInputStream(new BufferedInputStream(new FileInputStream(transactionsFile)));
            logger.info(SKIP_DB_LOGGING, "Transactions file size: {}", (Object)transactionsFile.length());
            int loop = 0;
            int pos = 0;
            long lastTransactionId = 0L;
            try {
                while (true) {
                    int length = dis.readInt();
                    byte[] bytes = new byte[length];
                    dis.readFully(bytes);
                    pos += length;
                    ResolvedTransaction transaction = null;
                    try {
                        transaction = ResolvedTransaction.createResolvedTransaction(bytes);
                        lastTransactionId = transaction.getTransactionId();
                        this.handleTransaction(transaction);
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (loop % 1000 == 0) {
                        logger.info(SKIP_DB_LOGGING, "Consumed transactions for far: {}, current transaction-id: {}, file pos: {}", new Object[]{loop, transaction == null ? "missing" : Long.valueOf(transaction.getTransactionId()), pos});
                    }
                    ++loop;
                }
            }
            catch (EOFException ignore) {
                logger.info(SKIP_DB_LOGGING, "Finished loading transactions, last transaction id: {}, consumed transactions: {}", (Object)lastTransactionId, (Object)loop);
                syncingFinished.set(true);
            }
        }
        logger.info(SKIP_DB_LOGGING, "Syncing transactions done");
    }

    public UniversalDB(File storagePath, LogIterator logIterator) throws Exception {
        AbstractUdbEntity.setDatabase(this);
        this.storagePath = storagePath;
        this.transactionIndex = new TransactionIndex(storagePath);
        LocalFileStore fileStore = new LocalFileStore(new File(storagePath, "file-store"));
        this.createShutdownHook();
        Schema schema = new Schema();
        this.schemaIndex = new SchemaIndex(schema, storagePath);
        this.schemaIndex.setFileStore(fileStore);
        long time = System.currentTimeMillis();
        long count = 0L;
        while (logIterator.hasNext()) {
            byte[] bytes = logIterator.next();
            ResolvedTransaction transaction = ResolvedTransaction.createResolvedTransaction(bytes);
            this.handleTransaction(transaction);
            ++count;
        }
        logger.info("Imported " + count + " transactions in: " + (System.currentTimeMillis() - time));
    }

    private void createShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            try {
                this.active = false;
                logger.info(SKIP_DB_LOGGING, "SHUTTING DOWN DATABASE");
                if (this.cluster != null) {
                    this.cluster.shutDown();
                }
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }));
    }

    private void mapSchema(Schema schema, boolean executeAsTransaction) {
        if (!this.transactionIndex.isValidSchema(schema)) {
            throw new RuntimeException("Cannot load incompatible schema. Current schema is:\n" + this.transactionIndex.getCurrentSchema() + "\nNew schema is:\n" + schema);
        }
        if (executeAsTransaction && this.transactionIndex.isSchemaUpdate(schema)) {
            TransactionRequest modelUpdateTransactionRequest = this.createModelUpdateTransactionRequest(schema);
            this.executeTransaction(modelUpdateTransactionRequest);
        } else {
            this.schemaIndex.merge(this.transactionIndex.getCurrentSchema(), true, this);
            for (DatabaseIndex database : this.schemaIndex.getDatabases()) {
                this.databaseById.put(database.getMappingId(), database);
                for (TableIndex table : database.getTables()) {
                    this.tableById.put(table.getMappingId(), table);
                    for (ColumnIndex columnIndex : table.getColumnIndices()) {
                        this.columnById.put(columnIndex.getMappingId(), columnIndex);
                    }
                }
            }
        }
    }

    private void installTableClasses(Schema schema, ClassLoader classLoader, boolean allSchemaTablesAreMandatory) throws Exception {
        String pojoPath = schema.getPojoNamespace();
        for (Database database : schema.getDatabases()) {
            String path = pojoPath + "." + database.getName().toLowerCase();
            for (Table table : database.getAllTables()) {
                TableIndex tableIndex = table.isView() ? this.schemaIndex.getTableByPath(table.getReferencedTablePath()) : this.schemaIndex.getTable(table);
                String tableName = table.getName();
                try {
                    String className = path + ".Udb" + tableName.substring(0, 1).toUpperCase() + tableName.substring(1);
                    Class<?> schemaClass = Class.forName(className, true, classLoader);
                    Method method = schemaClass.getDeclaredMethod("setTableIndex", TableIndex.class);
                    method.setAccessible(true);
                    method.invoke(null, tableIndex);
                    if (table.isView()) continue;
                    this.entityClassByTableIndex.put(tableIndex, schemaClass);
                    this.tableIndexByPath.put(tableIndex.getFQN(), tableIndex);
                    String queryClassName = path + ".Udb" + tableName.substring(0, 1).toUpperCase() + tableName.substring(1) + "Query";
                    Class<?> queryClass = Class.forName(queryClassName, true, classLoader);
                    this.queryClassByTableIndex.put(tableIndex, queryClass);
                }
                catch (ClassNotFoundException e) {
                    if (allSchemaTablesAreMandatory) {
                        throw e;
                    }
                    logger.info("Could not load entity class for tableIndex:" + tableIndex.getFQN());
                }
                catch (Exception e) {
                    throw e;
                }
            }
        }
    }

    public void addAuxiliaryModel(SchemaInfoProvider schemaInfo, ClassLoader classLoader) throws Exception {
        Schema schema = schemaInfo.getSchema();
        if (!this.transactionIndex.isValidSchema(schema)) {
            throw new RuntimeException("Cannot load incompatible schema. Current schema is:\n" + this.transactionIndex.getCurrentSchema() + "\nNew schema is:\n" + schema);
        }
        if (this.transactionIndex.isSchemaUpdate(schema)) {
            TransactionRequest modelUpdateTransactionRequest = this.createModelUpdateTransactionRequest(schema);
            this.executeTransaction(modelUpdateTransactionRequest);
        }
        this.installTableClasses(schema, classLoader, true);
    }

    public Class getEntityClass(TableIndex tableIndex) {
        return this.entityClassByTableIndex.get(tableIndex);
    }

    public Class getQueryClass(TableIndex tableIndex) {
        return this.queryClassByTableIndex.get(tableIndex);
    }

    public TableIndex getTableIndexByPath(String path) {
        return this.tableIndexByPath.get(path);
    }

    public synchronized TransactionRequest createTransactionRequest() {
        return new TransactionRequest(this.transactionIndex.getNodeId(), this.transactionIndex.createTransactionRequestId(), UniversalDB.getUserId());
    }

    public synchronized TransactionRequest createModelUpdateTransactionRequest(Schema schema) {
        return new TransactionRequest(this.transactionIndex.getNodeId(), this.transactionIndex.createTransactionRequestId(), UniversalDB.getUserId(), schema);
    }

    public synchronized void createInitialTableTransactions(TableIndex tableIndex) throws Exception {
        if (!tableIndex.getRecordVersioningIndex().isEmpty()) {
            return;
        }
        BitSet records = tableIndex.getRecords();
        int id = records.nextSetBit(0);
        while (id >= 0) {
            this.writeInitialTransaction(tableIndex, id, false);
            id = records.nextSetBit(id + 1);
        }
        if (tableIndex.isKeepDeletedRecords()) {
            records = tableIndex.getDeletedRecords();
            id = records.nextSetBit(0);
            while (id >= 0) {
                this.writeInitialTransaction(tableIndex, id, true);
                id = records.nextSetBit(id + 1);
            }
        }
    }

    private void writeInitialTransaction(TableIndex tableIndex, int id, boolean deleted) throws Exception {
        int recordId = id;
        ResolvedTransaction transaction = this.createInitialTransaction(tableIndex, recordId, false);
        ResolvedTransactionRecord record = new ResolvedTransactionRecord(ResolvedTransactionRecordType.CREATE_WITH_ID, tableIndex.getMappingId(), recordId);
        transaction.addTransactionRecord(record);
        List columnIndices = tableIndex.getColumnIndices().stream().filter(col -> !col.isEmpty(recordId)).collect(Collectors.toList());
        for (ColumnIndex column : columnIndices) {
            ResolvedTransactionRecordValue recordValue = this.createInitialTransactionRecordValue(column, recordId);
            record.addRecordValue(recordValue);
        }
        tableIndex.getRecordVersioningIndex().writeRecordUpdate(transaction, record);
        this.transactionIndex.writeTransaction(transaction);
        if (deleted) {
            transaction = this.createInitialTransaction(tableIndex, recordId, true);
            record = new ResolvedTransactionRecord(ResolvedTransactionRecordType.DELETE, tableIndex.getMappingId(), recordId);
            transaction.addTransactionRecord(record);
            record.addRecordValue(this.createInitialTransactionRecordValue(tableIndex.getColumnIndex("metaDeletionDate"), recordId));
            record.addRecordValue(this.createInitialTransactionRecordValue(tableIndex.getColumnIndex("metaDeletedBy"), recordId));
            tableIndex.getRecordVersioningIndex().writeRecordUpdate(transaction, record);
            this.transactionIndex.writeTransaction(transaction);
        }
    }

    private ResolvedTransaction createInitialTransaction(TableIndex tableIndex, int recordId, boolean deleted) {
        long transactionId = this.transactionIndex.getLastTransactionId() + 1L;
        int userId = 0;
        int timestamp = 0;
        ColumnIndex dateColumn = tableIndex.getColumnIndex(deleted ? "metaDeletionDate" : "metaCreationDate");
        ColumnIndex userRefColumn = tableIndex.getColumnIndex(deleted ? "metaDeletedBy" : "metaCreatedBy");
        if (dateColumn != null && userRefColumn != null) {
            userId = (Integer)userRefColumn.getGenericValue(recordId);
            timestamp = (Integer)dateColumn.getGenericValue(recordId);
        }
        return new ResolvedTransaction(this.transactionIndex.getNodeId(), this.transactionIndex.createTransactionRequestId(), transactionId, userId, (long)timestamp * 1000L);
    }

    private ResolvedTransactionRecordValue createInitialTransactionRecordValue(ColumnIndex column, int recordId) {
        switch (column.getType()) {
            case BOOLEAN: 
            case SHORT: 
            case INT: 
            case LONG: 
            case FLOAT: 
            case DOUBLE: 
            case TEXT: 
            case TRANSLATABLE_TEXT: 
            case BINARY: {
                Object value = column.getGenericValue(recordId);
                return new ResolvedTransactionRecordValue(column.getMappingId(), column.getType(), value);
            }
            case REFERENCE: {
                SingleReferenceIndex singleReferenceIndex = (SingleReferenceIndex)column;
                int referencedRecordId = singleReferenceIndex.getValue(recordId);
                return new ResolvedTransactionRecordValue(column.getMappingId(), column.getType(), referencedRecordId);
            }
            case MULTI_REFERENCE: {
                MultiReferenceIndex multiReferenceIndex = (MultiReferenceIndex)column;
                List<Integer> references = multiReferenceIndex.getReferencesAsList(recordId);
                ResolvedMultiReferenceUpdate multiReferenceUpdate = ResolvedMultiReferenceUpdate.createSetReferences(references);
                return new ResolvedTransactionRecordValue(column.getMappingId(), column.getType(), multiReferenceUpdate);
            }
            case FILE: {
                FileIndex fileIndex = (FileIndex)column;
                FileValue fileValue = fileIndex.getValue(recordId);
                return new ResolvedTransactionRecordValue(column.getMappingId(), column.getType(), fileValue);
            }
        }
        return null;
    }

    public ResolvedTransaction executeTransaction(TransactionRequest transaction) {
        try {
            if (this.clusterClientTopic != null) {
                if (!this.active) {
                    return null;
                }
                if (transaction.getTransactionType() == TransactionType.MODEL_UPDATE) {
                    return null;
                }
                CompletableFuture completableFuture = new CompletableFuture();
                this.transactionCompletableFutureMap.put(transaction.getRequestId(), completableFuture);
                this.clusterClientTopic.sendMessageAsync(transaction.getBytes());
                ResolvedTransaction resolvedTransaction = (ResolvedTransaction)completableFuture.get();
                resolvedTransaction.getRecordIdByCorrelationId().entrySet().forEach(entry -> transaction.putResolvedRecordIdForCorrelationId((Integer)entry.getKey(), (Integer)entry.getValue()));
                return resolvedTransaction;
            }
            return this.handleTransactionRequest(transaction);
        }
        catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    private synchronized ResolvedTransaction handleTransactionRequest(TransactionRequest transactionRequest) throws Exception {
        TransactionType transactionType = transactionRequest.getTransactionType();
        long transactionId = this.transactionIndex.getLastTransactionId() + 1L;
        ResolvedTransaction resolvedTransaction = ResolvedTransaction.createFromRequest(transactionId, transactionRequest);
        if (transactionType == TransactionType.DATA_UPDATE) {
            this.handleDataUpdateRequest(transactionRequest, resolvedTransaction);
        } else {
            this.handleModelUpdateRequest(transactionRequest, resolvedTransaction);
        }
        if (this.leaderTransactionClusterMessageQueue != null) {
            this.leaderTransactionClusterMessageQueue.sendMessageAsync(resolvedTransaction.getBytes());
        }
        return resolvedTransaction;
    }

    private void handleModelUpdateRequest(TransactionRequest request, ResolvedTransaction resolvedTransaction) throws Exception {
        Schema schema = request.getSchema();
        if (!this.transactionIndex.isValidSchema(schema)) {
            throw new RuntimeException("Cannot update incompatible schema. Current schema is:\n" + this.transactionIndex.getCurrentSchema() + "\nNew schema is:\n" + schema);
        }
        Schema localSchema = this.transactionIndex.getCurrentSchema();
        if (localSchema != null) {
            localSchema.merge(schema);
        } else {
            localSchema = schema;
        }
        localSchema.mapSchema();
        resolvedTransaction.setSchema(localSchema);
        this.transactionIndex.writeTransaction(resolvedTransaction);
        this.transactionIndex.writeSchemaUpdate(resolvedTransaction.getSchemaUpdate());
        this.schemaIndex.merge(localSchema, true, this);
        for (DatabaseIndex database : this.schemaIndex.getDatabases()) {
            this.databaseById.put(database.getMappingId(), database);
            for (TableIndex table : database.getTables()) {
                this.tableById.put(table.getMappingId(), table);
                for (ColumnIndex columnIndex : table.getColumnIndices()) {
                    this.columnById.put(columnIndex.getMappingId(), columnIndex);
                }
            }
        }
    }

    private void handleDataUpdateRequest(TransactionRequest request, ResolvedTransaction resolvedTransaction) throws Exception {
        int recordId;
        TableIndex tableIndex;
        for (TransactionRequestRecord record : request.getRecords()) {
            if (record.getRecordType() != TransactionRequestRecordType.CREATE && record.getRecordType() != TransactionRequestRecordType.CREATE_WITH_ID) continue;
            tableIndex = this.getTableIndexById(record.getTableId());
            recordId = tableIndex.createRecord(record.getRecordId());
            request.putResolvedRecordIdForCorrelationId(record.getCorrelationId(), recordId);
        }
        for (TransactionRequestRecord record : request.getRecords()) {
            tableIndex = this.getTableIndexById(record.getTableId());
            if (record.isTransactionProcessingStarted()) {
                logger.error("Prevented processing of record again:" + record.getTableId() + ":" + record.getRecordId());
                continue;
            }
            record.setTransactionProcessingStarted(true);
            recordId = record.getRecordId() != 0 ? record.getRecordId() : request.getResolvedRecordIdByCorrelationId(record.getCorrelationId());
            ResolvedTransactionRecord resolvedRecord = ResolvedTransactionRecord.createFromRequest(record, recordId);
            resolvedTransaction.addTransactionRecord(resolvedRecord);
            switch (record.getRecordType()) {
                case CREATE: 
                case CREATE_WITH_ID: 
                case UPDATE: {
                    for (TransactionRequestRecordValue recordValue : record.getRecordValues()) {
                        List<CyclicReferenceUpdate> cyclicReferenceUpdates = this.persistColumnValueUpdates(recordId, recordValue, request.getRecordIdByCorrelationId(), resolvedRecord);
                        if (cyclicReferenceUpdates == null || cyclicReferenceUpdates.isEmpty()) continue;
                        Iterator<Object> iterator = cyclicReferenceUpdates.iterator();
                        while (iterator.hasNext()) {
                            CyclicReferenceUpdate referenceUpdate = (CyclicReferenceUpdate)iterator.next();
                            resolvedTransaction.addTransactionRecord(ResolvedTransactionRecord.createCyclicRecord(referenceUpdate));
                        }
                    }
                    List<FullTextIndexValue> fullTextIndexValues = record.getRecordValues().stream().filter(value -> value.getIndexType() == IndexType.TEXT || value.getIndexType() == IndexType.TRANSLATABLE_TEXT).map(value -> {
                        String columnName = this.getColumnById(value.getColumnId()).getName();
                        return value.getIndexType() == IndexType.TEXT ? new FullTextIndexValue(columnName, (String)value.getValue()) : new FullTextIndexValue(columnName, (TranslatableText)value.getValue());
                    }).collect(Collectors.toList());
                    if (fullTextIndexValues.isEmpty()) break;
                    tableIndex.updateFullTextIndex(recordId, fullTextIndexValues, record.getRecordType() == TransactionRequestRecordType.UPDATE);
                    break;
                }
                case DELETE: {
                    List<CyclicReferenceUpdate> cyclicReferenceUpdates = tableIndex.deleteRecord(record.getRecordId());
                    for (TransactionRequestRecordValue recordValue : record.getRecordValues()) {
                        this.persistColumnValueUpdates(recordId, recordValue, request.getRecordIdByCorrelationId(), resolvedRecord);
                    }
                    if (cyclicReferenceUpdates == null || cyclicReferenceUpdates.isEmpty()) break;
                    for (CyclicReferenceUpdate referenceUpdate : cyclicReferenceUpdates) {
                        resolvedTransaction.addTransactionRecord(ResolvedTransactionRecord.createCyclicRecord(referenceUpdate));
                    }
                    break;
                }
                case RESTORE: {
                    List<CyclicReferenceUpdate> cyclicReferenceUpdates2 = tableIndex.restoreRecord(record.getRecordId());
                    for (TransactionRequestRecordValue recordValue : record.getRecordValues()) {
                        this.persistColumnValueUpdates(recordId, recordValue, request.getRecordIdByCorrelationId(), resolvedRecord);
                    }
                    if (cyclicReferenceUpdates2 == null || cyclicReferenceUpdates2.isEmpty()) break;
                    for (CyclicReferenceUpdate referenceUpdate : cyclicReferenceUpdates2) {
                        resolvedTransaction.addTransactionRecord(ResolvedTransactionRecord.createCyclicRecord(referenceUpdate));
                    }
                    break;
                }
            }
            this.addRecordUpdateEvent(resolvedRecord, resolvedTransaction.getUserId());
        }
        this.transactionIndex.writeTransaction(resolvedTransaction);
        for (ResolvedTransactionRecord transactionRecord : resolvedTransaction.getTransactionRecords()) {
            tableIndex = this.getTableIndexById(transactionRecord.getTableId());
            tableIndex.getRecordVersioningIndex().writeRecordUpdate(resolvedTransaction, transactionRecord);
        }
        resolvedTransaction.setRecordIdByCorrelationId(request.getRecordIdByCorrelationId());
    }

    private synchronized void handleTransaction(ResolvedTransaction transaction) throws Exception {
        if (transaction.getTransactionType() == TransactionType.DATA_UPDATE) {
            this.handleDataUpdateTransaction(transaction);
        } else {
            this.handleModelUpdateTransaction(transaction);
        }
    }

    private void handleModelUpdateTransaction(ResolvedTransaction transaction) throws Exception {
        Schema schema = transaction.getSchema();
        Schema localSchema = this.transactionIndex.getCurrentSchema();
        if (localSchema != null) {
            localSchema.merge(schema);
        } else {
            localSchema = schema;
        }
        localSchema.mapSchema();
        this.transactionIndex.writeTransaction(transaction);
        this.transactionIndex.writeSchemaUpdate(transaction.getSchemaUpdate());
        this.schemaIndex.merge(localSchema, true, this);
        for (DatabaseIndex database : this.schemaIndex.getDatabases()) {
            this.databaseById.put(database.getMappingId(), database);
            for (TableIndex table : database.getTables()) {
                this.tableById.put(table.getMappingId(), table);
                for (ColumnIndex columnIndex : table.getColumnIndices()) {
                    this.columnById.put(columnIndex.getMappingId(), columnIndex);
                }
            }
        }
    }

    private void handleDataUpdateTransaction(ResolvedTransaction transaction) throws Exception {
        TableIndex tableIndex;
        for (ResolvedTransactionRecord record : transaction.getTransactionRecords()) {
            tableIndex = this.getTableIndexById(record.getTableId());
            switch (record.getRecordType()) {
                case CREATE: 
                case CREATE_WITH_ID: 
                case UPDATE: {
                    if (record.getRecordType() == ResolvedTransactionRecordType.CREATE || record.getRecordType() == ResolvedTransactionRecordType.CREATE_WITH_ID) {
                        tableIndex.createRecord(record.getRecordId());
                    }
                    for (ResolvedTransactionRecordValue recordValue : record.getRecordValues()) {
                        this.persistColumnValueUpdates(record.getRecordId(), recordValue);
                    }
                    List<FullTextIndexValue> fullTextIndexValues = record.getRecordValues().stream().filter(value -> value.getIndexType() == IndexType.TEXT || value.getIndexType() == IndexType.TRANSLATABLE_TEXT).map(value -> {
                        String columnName = this.getColumnById(value.getColumnId()).getName();
                        return value.getIndexType() == IndexType.TEXT ? new FullTextIndexValue(columnName, (String)value.getValue()) : new FullTextIndexValue(columnName, (TranslatableText)value.getValue());
                    }).collect(Collectors.toList());
                    if (fullTextIndexValues.isEmpty()) break;
                    tableIndex.updateFullTextIndex(record.getRecordId(), fullTextIndexValues, record.getRecordType() == ResolvedTransactionRecordType.UPDATE);
                    break;
                }
                case DELETE: {
                    tableIndex.deleteRecord(record.getRecordId());
                    for (ResolvedTransactionRecordValue recordValue : record.getRecordValues()) {
                        this.persistColumnValueUpdates(record.getRecordId(), recordValue);
                    }
                    break;
                }
                case RESTORE: {
                    tableIndex.restoreRecord(record.getRecordId());
                    for (ResolvedTransactionRecordValue recordValue : record.getRecordValues()) {
                        this.persistColumnValueUpdates(record.getRecordId(), recordValue);
                    }
                    break;
                }
                case ADD_CYCLIC_REFERENCE: {
                    break;
                }
            }
            this.addRecordUpdateEvent(record, transaction.getUserId());
        }
        this.transactionIndex.writeTransaction(transaction);
        for (ResolvedTransactionRecord transactionRecord : transaction.getTransactionRecords()) {
            tableIndex = this.getTableIndexById(transactionRecord.getTableId());
            tableIndex.getRecordVersioningIndex().writeRecordUpdate(transaction, transactionRecord);
        }
    }

    private List<CyclicReferenceUpdate> persistColumnValueUpdates(int recordId, TransactionRequestRecordValue recordValue, Map<Integer, Integer> recordIdByCorrelationId, ResolvedTransactionRecord resolvedRecord) {
        ColumnIndex column = this.getColumnById(recordValue.getColumnId());
        Object value = recordValue.getValue();
        if (column.getType() == IndexType.MULTI_REFERENCE) {
            MultiReferenceIndex multiReferenceIndex = (MultiReferenceIndex)column;
            MultiReferenceEditValue editValue = (MultiReferenceEditValue)value;
            editValue.updateReferences(recordIdByCorrelationId);
            ResolvedMultiReferenceUpdate resolvedUpdateValue = editValue.getResolvedUpdateValue();
            resolvedRecord.addRecordValue(new ResolvedTransactionRecordValue(recordValue.getColumnId(), recordValue.getIndexType(), resolvedUpdateValue));
            return multiReferenceIndex.setReferenceEditValue(recordId, editValue);
        }
        if (column.getType() == IndexType.REFERENCE) {
            SingleReferenceIndex singleReferenceIndex = (SingleReferenceIndex)column;
            if (value != null) {
                RecordReference recordReference = (RecordReference)value;
                recordReference.updateReference(recordIdByCorrelationId);
                resolvedRecord.addRecordValue(new ResolvedTransactionRecordValue(recordValue.getColumnId(), recordValue.getIndexType(), recordReference.getRecordId()));
                return singleReferenceIndex.setReferenceValue(recordId, recordReference);
            }
            resolvedRecord.addRecordValue(new ResolvedTransactionRecordValue(recordValue.getColumnId(), recordValue.getIndexType(), null));
            return singleReferenceIndex.setReferenceValue(recordId, null);
        }
        column.setGenericValue(recordId, value);
        resolvedRecord.addRecordValue(new ResolvedTransactionRecordValue(recordValue.getColumnId(), recordValue.getIndexType(), value));
        return null;
    }

    private void persistColumnValueUpdates(int recordId, ResolvedTransactionRecordValue recordValue) {
        ColumnIndex column = this.getColumnById(recordValue.getColumnId());
        Object value = recordValue.getValue();
        if (column.getType() == IndexType.MULTI_REFERENCE) {
            MultiReferenceIndex multiReferenceIndex = (MultiReferenceIndex)column;
            ResolvedMultiReferenceUpdate multiReferenceUpdate = (ResolvedMultiReferenceUpdate)value;
            multiReferenceIndex.setResolvedReferenceEditValue(recordId, multiReferenceUpdate);
        } else if (column.getType() == IndexType.REFERENCE) {
            SingleReferenceIndex singleReferenceIndex = (SingleReferenceIndex)column;
            if (value != null) {
                int referencedRecordId = (Integer)value;
                singleReferenceIndex.setValue(recordId, referencedRecordId, false);
            } else {
                singleReferenceIndex.setValue(recordId, 0, false);
            }
        } else {
            column.setGenericValue(recordId, value);
        }
    }

    public void createDatabaseDump(File dumpFolder) throws IOException {
        for (DatabaseIndex database : this.schemaIndex.getDatabases()) {
            File dbFolder = new File(dumpFolder, database.getName());
            dbFolder.mkdir();
            for (TableIndex table : database.getTables()) {
                File tableFolder = new File(dbFolder, table.getName());
                tableFolder.mkdir();
                BitSet records = table.getRecords();
                for (ColumnIndex columnIndex : table.getColumnIndices()) {
                    File dumpFile = new File(tableFolder, columnIndex.getName() + ".dbd");
                    columnIndex.dumpIndex(dumpFile, records);
                }
            }
        }
    }

    private void addRecordUpdateEvent(ResolvedTransactionRecord resolvedRecord, int userId) {
        if (userId > 0) {
            RecordUpdateEvent updateEvent = new RecordUpdateEvent(resolvedRecord.getTableId(), resolvedRecord.getRecordId(), userId, resolvedRecord.getRecordType().getUpdateType());
            this.updateEventQueue.offer(updateEvent);
        }
    }

    @Override
    public DatabaseIndex getDatabaseById(int mappingId) {
        return this.databaseById.get(mappingId);
    }

    @Override
    public TableIndex getTableIndexById(int mappingId) {
        return this.tableById.get(mappingId);
    }

    @Override
    public ColumnIndex getColumnById(int mappingId) {
        return this.columnById.get(mappingId);
    }

    public SchemaIndex getSchemaIndex() {
        return this.schemaIndex;
    }

    public TransactionIndex getTransactionIndex() {
        return this.transactionIndex;
    }

    public ArrayBlockingQueue<RecordUpdateEvent> getUpdateEventQueue() {
        return this.updateEventQueue;
    }
}

