/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ignite.stream;

import java.util.Collection;
import java.util.Map;
import javax.cache.processor.EntryProcessor;
import javax.cache.processor.EntryProcessorException;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.internal.util.lang.GridPeerDeployAware;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.stream.StreamReceiver;

public abstract class StreamTransformer<K, V>
implements StreamReceiver<K, V>,
EntryProcessor<K, V, Object> {
    private static final long serialVersionUID = 0L;
    private static final boolean compatibilityMode = IgniteSystemProperties.getBoolean("IGNITE_STREAM_TRANSFORMER_COMPATIBILITY_MODE");

    @Override
    public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException {
        for (Map.Entry<K, V> entry : entries) {
            cache.invoke(entry.getKey(), this, entry.getValue());
        }
    }

    public static <K, V> StreamTransformer<K, V> from(final CacheEntryProcessor<K, V, Object> ep) {
        if (compatibilityMode) {
            return new StreamTransformer<K, V>(){

                @Override
                public Object process(MutableEntry<K, V> entry, Object ... args) throws EntryProcessorException {
                    return ep.process(entry, args);
                }
            };
        }
        return new EntryProcessorWrapper<K, V>(ep);
    }

    private static class EntryProcessorWrapper<K, V>
    extends StreamTransformer<K, V>
    implements GridPeerDeployAware {
        private static final long serialVersionUID = 0L;
        private CacheEntryProcessor<K, V, Object> ep;
        private transient ClassLoader ldr;

        EntryProcessorWrapper(CacheEntryProcessor<K, V, Object> ep) {
            this.ep = ep;
        }

        @Override
        public Object process(MutableEntry<K, V> entry, Object ... args) throws EntryProcessorException {
            return this.ep.process(entry, args);
        }

        @Override
        public Class<?> deployClass() {
            return this.ep.getClass();
        }

        @Override
        public ClassLoader classLoader() {
            if (this.ldr == null) {
                this.ldr = U.detectClassLoader(this.deployClass());
            }
            return this.ldr;
        }
    }
}

