/*
 * Decompiled with CFR 0.152.
 */
package org.symphonyoss.s2.canon.runtime;

import java.io.IOException;
import java.io.Reader;
import org.symphonyoss.s2.canon.runtime.IEntity;
import org.symphonyoss.s2.canon.runtime.IModelRegistry;
import org.symphonyoss.s2.common.exception.InvalidValueException;
import org.symphonyoss.s2.fugue.core.trace.ITraceContext;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeConsumer;
import org.symphonyoss.s2.fugue.pipeline.IThreadSafeErrorConsumer;

public abstract class EntityConsumer<P, E extends IEntity, C extends IThreadSafeConsumer<E>>
implements IThreadSafeConsumer<P> {
    private final IModelRegistry modelRegistry_;
    private final Class<E> entityType_;
    private final C consumer_;
    private final IThreadSafeErrorConsumer<P> invalidMessageConsumer_;

    public EntityConsumer(IModelRegistry modelRegistry, Class<E> entityType, C consumer, IThreadSafeErrorConsumer<P> invalidMessageConsumer) {
        this.modelRegistry_ = modelRegistry;
        this.entityType_ = entityType;
        this.consumer_ = consumer;
        this.invalidMessageConsumer_ = invalidMessageConsumer;
    }

    public void consume(P item, ITraceContext trace) {
        try {
            IEntity modelObject = this.modelRegistry_.parseOne(this.getReader(item));
            if (this.entityType_.isInstance(modelObject)) {
                this.consumer_.consume(this.entityType_.cast(modelObject), trace);
            } else {
                this.invalidMessageConsumer_.consume(item, trace, "Received an entity which is not an instance of " + this.entityType_.getName(), null);
            }
        }
        catch (IOException | InvalidValueException e) {
            this.invalidMessageConsumer_.consume(item, trace, "Received an entity which is not a known type", e);
        }
    }

    protected abstract Reader getReader(P var1);

    public void close() {
        this.consumer_.close();
    }
}

