/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.internal.processors.platform.cache.query;

import java.io.ObjectStreamException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.cache.Cache;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryEventFilter;
import javax.cache.event.CacheEntryListenerException;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.Query;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.binary.BinaryObjectImpl;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.query.QueryCursorEx;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformJavaObjectFactoryProxy;
import org.apache.ignite.internal.processors.platform.PlatformTarget;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery;
import org.apache.ignite.internal.processors.platform.cache.query.PlatformQueryCursor;
import org.apache.ignite.internal.processors.platform.utils.PlatformUtils;
import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;

public class PlatformContinuousQueryImpl
implements PlatformContinuousQuery {
    private static final long serialVersionUID = 0L;
    protected final PlatformContext platformCtx;
    private final boolean hasFilter;
    protected final Object filter;
    protected final CacheEntryEventFilter javaFilter;
    private long ptr;
    private QueryCursor cursor;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private PlatformQueryCursor initialQryCur;

    public PlatformContinuousQueryImpl(PlatformContext platformCtx, long ptr, boolean hasFilter, Object filter) {
        assert (ptr != 0L);
        this.platformCtx = platformCtx;
        this.ptr = ptr;
        this.hasFilter = hasFilter;
        this.filter = filter;
        this.javaFilter = PlatformContinuousQueryImpl.getJavaFilter(filter, platformCtx.kernalContext());
        if (this.javaFilter != null && !(this.javaFilter instanceof CacheEntryEventSerializableFilter)) {
            throw new IgniteException("Java event filter must implement " + CacheEntryEventSerializableFilter.class.getName() + " interface: " + this.javaFilter.getClass().getName());
        }
    }

    private static CacheEntryEventFilter getJavaFilter(Object filter, GridKernalContext ctx) {
        BinaryObjectImpl bo;
        if (filter instanceof BinaryObjectImpl && (bo = (BinaryObjectImpl)filter).typeId() == 99) {
            PlatformJavaObjectFactoryProxy prx = (PlatformJavaObjectFactoryProxy)bo.deserialize();
            return (CacheEntryEventFilter)prx.factory(ctx).create();
        }
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(IgniteCacheProxy cache, boolean loc, int bufSize, long timeInterval, boolean autoUnsubscribe, Query initialQry) throws IgniteCheckedException {
        this.lock.writeLock().lock();
        try {
            try {
                ContinuousQuery qry = new ContinuousQuery();
                qry.setLocalListener(this);
                qry.setRemoteFilter(this);
                qry.setPageSize(bufSize);
                qry.setTimeInterval(timeInterval);
                qry.setAutoUnsubscribe(autoUnsubscribe);
                qry.setInitialQuery(initialQry);
                this.cursor = cache.query(qry.setLocal(loc));
                if (initialQry != null) {
                    this.initialQryCur = new PlatformQueryCursor(this.platformCtx, new QueryCursorEx<Cache.Entry>(){

                        @Override
                        public Iterator<Cache.Entry> iterator() {
                            return PlatformContinuousQueryImpl.this.cursor.iterator();
                        }

                        @Override
                        public List<Cache.Entry> getAll() {
                            return PlatformContinuousQueryImpl.this.cursor.getAll();
                        }

                        @Override
                        public void close() {
                        }

                        @Override
                        public void getAll(QueryCursorEx.Consumer<Cache.Entry> clo) throws IgniteCheckedException {
                            for (Cache.Entry t : this) {
                                clo.consume(t);
                            }
                        }

                        @Override
                        public List<GridQueryFieldMetadata> fieldsMeta() {
                            return null;
                        }
                    }, initialQry.getPageSize() > 0 ? initialQry.getPageSize() : 1024);
                }
            }
            catch (Exception e) {
                try {
                    this.close0();
                }
                catch (Exception exception) {
                    // empty catch block
                }
                throw PlatformUtils.unwrapQueryException(e);
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public void onUpdated(Iterable evts) throws CacheEntryListenerException {
        this.lock.readLock().lock();
        try {
            if (this.ptr == 0L) {
                throw new CacheEntryListenerException("Failed to notify listener because it has been closed.");
            }
            PlatformUtils.applyContinuousQueryEvents(this.platformCtx, this.ptr, evts);
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public boolean evaluate(CacheEntryEvent evt) throws CacheEntryListenerException {
        if (this.javaFilter != null) {
            return this.javaFilter.evaluate(evt);
        }
        this.lock.readLock().lock();
        try {
            if (this.ptr == 0L) {
                throw new CacheEntryListenerException("Failed to evaluate the filter because it has been closed.");
            }
            boolean bl = !this.hasFilter || PlatformUtils.evaluateContinuousQueryEvent(this.platformCtx, this.ptr, evt);
            return bl;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    @Override
    public void onQueryUnregister() {
        this.close();
    }

    @Override
    public void close() {
        this.lock.writeLock().lock();
        try {
            this.close0();
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public PlatformTarget getInitialQueryCursor() {
        return this.initialQryCur;
    }

    private void close0() {
        if (this.ptr != 0L) {
            long ptr0 = this.ptr;
            this.ptr = 0L;
            if (this.cursor != null) {
                this.cursor.close();
            }
            this.platformCtx.gateway().continuousQueryFilterRelease(ptr0);
        }
    }

    Object writeReplace() throws ObjectStreamException {
        if (this.javaFilter != null) {
            return this.javaFilter;
        }
        return this.filter == null ? null : this.platformCtx.createContinuousQueryFilter(this.filter);
    }
}

