/*
 * Decompiled with CFR 0.152.
 */
package monasca.thresh.infrastructure.thresholding;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Values;
import java.io.Serializable;
import java.util.List;
import monasca.thresh.EventSpoutConfig;
import monasca.thresh.infrastructure.thresholding.KafkaSpout;
import monasca.thresh.infrastructure.thresholding.deserializer.EventDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventSpout
extends KafkaSpout {
    private static final Logger logger = LoggerFactory.getLogger(EventSpout.class);
    private static final long serialVersionUID = 8457340455857276878L;
    private final EventDeserializer deserializer;

    public EventSpout(EventSpoutConfig configuration, EventDeserializer deserializer) {
        super(configuration);
        this.deserializer = deserializer;
        logger.info("EventSpout created");
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(this.deserializer.getOutputFields());
    }

    @Override
    protected void processMessage(byte[] message, SpoutOutputCollector collector) {
        List<List<?>> events = this.deserializer.deserialize(message);
        if (events != null) {
            for (List<?> event : events) {
                Object eventToSend = event.get(0);
                if (!(eventToSend instanceof Serializable)) {
                    logger.error("Class {} is not Serializable: {}", eventToSend.getClass(), eventToSend);
                    continue;
                }
                collector.emit((List)new Values(new Object[]{eventToSend}));
            }
        }
    }
}

