/*
 * Decompiled with CFR 0.152.
 */
package monasca.thresh;

import backtype.storm.Config;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.BoltDeclarer;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import java.util.HashMap;
import javax.inject.Named;
import monasca.common.util.Injector;
import monasca.thresh.ThresholdingConfiguration;
import monasca.thresh.infrastructure.thresholding.AlarmCreationBolt;
import monasca.thresh.infrastructure.thresholding.AlarmThresholdingBolt;
import monasca.thresh.infrastructure.thresholding.EventProcessingBolt;
import monasca.thresh.infrastructure.thresholding.EventSpout;
import monasca.thresh.infrastructure.thresholding.MetricAggregationBolt;
import monasca.thresh.infrastructure.thresholding.MetricFilteringBolt;
import monasca.thresh.infrastructure.thresholding.MetricSpout;
import monasca.thresh.infrastructure.thresholding.deserializer.EventDeserializer;
import monasca.thresh.utils.StatsdMetricConsumer;

public class TopologyModule
extends AbstractModule {
    private final ThresholdingConfiguration config;
    private Config stormConfig;
    private IRichSpout metricSpout;
    private IRichSpout eventSpout;

    public TopologyModule(ThresholdingConfiguration config) {
        this.config = config;
    }

    public TopologyModule(ThresholdingConfiguration threshConfig, Config stormConfig, IRichSpout metricSpout, IRichSpout eventSpout) {
        this(threshConfig);
        this.stormConfig = stormConfig;
        this.metricSpout = metricSpout;
        this.eventSpout = eventSpout;
    }

    protected void configure() {
    }

    @Provides
    Config stormConfig() {
        if (this.stormConfig == null) {
            this.stormConfig = new Config();
            this.stormConfig.setNumWorkers(this.config.numWorkerProcesses.intValue());
            this.stormConfig.setNumAckers(this.config.numAckerThreads.intValue());
            HashMap<String, Object> statsdConfig = new HashMap<String, Object>();
            if (this.config.statsdConfig.getHost() != null) {
                statsdConfig.put("metrics.statsd.host", this.config.statsdConfig.getHost());
            }
            if (this.config.statsdConfig.getPort() != null) {
                statsdConfig.put("metrics.statsd.port", this.config.statsdConfig.getPort());
            }
            if (this.config.statsdConfig.getPrefix() != null) {
                statsdConfig.put("metrics.statsd.prefix", this.config.statsdConfig.getPrefix());
            }
            if (this.config.statsdConfig.getDimensions() != null) {
                statsdConfig.put("metrics.statsd.dimensions", this.config.statsdConfig.getDimensions());
            }
            this.stormConfig.registerMetricsConsumer(StatsdMetricConsumer.class, statsdConfig, 2L);
        }
        return this.stormConfig;
    }

    @Provides
    @Named(value="metrics")
    IRichSpout metricSpout() {
        return this.metricSpout == null ? new MetricSpout(this.config.metricSpoutConfig) : this.metricSpout;
    }

    @Provides
    @Named(value="event")
    IRichSpout eventSpout() {
        return this.eventSpout == null ? new EventSpout(this.config.eventSpoutConfig, new EventDeserializer()) : this.eventSpout;
    }

    @Provides
    StormTopology topology() {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("metrics-spout", (IRichSpout)Injector.getInstance(IRichSpout.class, (String)"metrics"), (Number)this.config.metricSpoutThreads).setNumTasks((Number)this.config.metricSpoutTasks);
        builder.setSpout("event-spout", (IRichSpout)Injector.getInstance(IRichSpout.class, (String)"event"), (Number)this.config.eventSpoutThreads).setNumTasks((Number)this.config.eventSpoutTasks);
        ((BoltDeclarer)builder.setBolt("event-bolt", (IRichBolt)new EventProcessingBolt(this.config.database), (Number)this.config.eventBoltThreads).shuffleGrouping("event-spout")).setNumTasks((Number)this.config.eventBoltTasks);
        ((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)builder.setBolt("filtering-bolt", (IRichBolt)new MetricFilteringBolt(this.config.database), (Number)this.config.filteringBoltThreads).fieldsGrouping("metrics-spout", new Fields(new String[]{MetricSpout.FIELDS[0]}))).allGrouping("event-bolt", "metric-alarm-events")).allGrouping("event-bolt", "alarm-definition-events")).setNumTasks((Number)this.config.filteringBoltTasks);
        ((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)builder.setBolt("alarm-creation-bolt", (IRichBolt)new AlarmCreationBolt(this.config.database), (Number)this.config.alarmCreationBoltThreads).fieldsGrouping("filtering-bolt", "newMetricForAlarmDefinitionStream", new Fields(new String[]{AlarmCreationBolt.ALARM_CREATION_FIELDS[3]}))).allGrouping("event-bolt", "metric-sub-alarm-events")).allGrouping("event-bolt", "alarm-events")).allGrouping("event-bolt", "alarm-definition-events")).setNumTasks((Number)this.config.alarmCreationBoltTasks);
        ((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)builder.setBolt("aggregation-bolt", (IRichBolt)new MetricAggregationBolt(this.config), (Number)this.config.aggregationBoltThreads).fieldsGrouping("filtering-bolt", new Fields(new String[]{MetricFilteringBolt.FIELDS[0]}))).allGrouping("filtering-bolt", "MetricAggregationControl")).fieldsGrouping("filtering-bolt", "alarm-creation-stream", new Fields(new String[]{AlarmCreationBolt.ALARM_CREATION_FIELDS[1]}))).allGrouping("event-bolt", "metric-sub-alarm-events")).fieldsGrouping("event-bolt", "metric-alarm-events", new Fields(new String[]{EventProcessingBolt.METRIC_ALARM_EVENT_STREAM_FIELDS[1]}))).fieldsGrouping("alarm-creation-bolt", "alarm-creation-stream", new Fields(new String[]{AlarmCreationBolt.ALARM_CREATION_FIELDS[1]}))).setNumTasks((Number)this.config.aggregationBoltTasks);
        ((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)((BoltDeclarer)builder.setBolt("thresholding-bolt", (IRichBolt)new AlarmThresholdingBolt(this.config.database, this.config.kafkaProducerConfig), (Number)this.config.thresholdingBoltThreads).fieldsGrouping("aggregation-bolt", new Fields(new String[]{MetricAggregationBolt.FIELDS[0]}))).fieldsGrouping("event-bolt", "alarm-events", new Fields(new String[]{EventProcessingBolt.ALARM_EVENT_STREAM_FIELDS[1]}))).allGrouping("event-bolt", "alarm-definition-events")).allGrouping("event-bolt", "metric-sub-alarm-events")).setNumTasks((Number)this.config.thresholdingBoltTasks);
        return builder.createTopology();
    }
}

