/*
 * Decompiled with CFR 0.152.
 */
package monasca.thresh.infrastructure.thresholding;

import com.google.inject.Module;
import java.util.List;
import java.util.Map;
import monasca.common.model.alarm.AlarmSubExpression;
import monasca.common.model.event.AlarmDefinitionCreatedEvent;
import monasca.common.model.event.AlarmDefinitionDeletedEvent;
import monasca.common.model.event.AlarmDefinitionUpdatedEvent;
import monasca.common.model.event.AlarmDeletedEvent;
import monasca.common.model.event.AlarmUpdatedEvent;
import monasca.common.model.metric.MetricDefinition;
import monasca.common.util.Injector;
import monasca.thresh.domain.model.MetricDefinitionAndTenantId;
import monasca.thresh.domain.model.SubExpression;
import monasca.thresh.domain.model.TenantIdAndMetricName;
import monasca.thresh.domain.service.AlarmDAO;
import monasca.thresh.infrastructure.persistence.PersistenceModule;
import monasca.thresh.infrastructure.thresholding.DataSourceFactory;
import monasca.thresh.utils.Logging;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventProcessingBolt
extends BaseRichBolt {
    private static final long serialVersionUID = 897171858708109378L;
    public static final String ALARM_EVENT_STREAM_ID = "alarm-events";
    public static final String METRIC_ALARM_EVENT_STREAM_ID = "metric-alarm-events";
    public static final String METRIC_SUB_ALARM_EVENT_STREAM_ID = "metric-sub-alarm-events";
    public static final String ALARM_DEFINITION_EVENT_STREAM_ID = "alarm-definition-events";
    public static final String[] ALARM_EVENT_STREAM_FIELDS = new String[]{"eventType", "alarmId", "alarm"};
    public static final String[] METRIC_ALARM_EVENT_STREAM_FIELDS = new String[]{"eventType", "tenantIdAndMetricName", "metricDefinitionAndTenantId", "alarmDefinitionId", "subAlarmId"};
    public static final String[] METRIC_SUB_ALARM_EVENT_STREAM_FIELDS = new String[]{"eventType", "subExpression", "alarmDefinitionId"};
    public static final String[] ALARM_DEFINITION_EVENT_FIELDS = new String[]{"eventType", "argument"};
    public static final String CREATED = "created";
    public static final String DELETED = "deleted";
    public static final String UPDATED = "updated";
    public static final String RESEND = "resend";
    private transient Logger logger;
    private OutputCollector collector;
    private AlarmDAO alarmDAO;
    private DataSourceFactory dbConfig;

    public EventProcessingBolt(DataSourceFactory dbConfig) {
        this.dbConfig = dbConfig;
    }

    public EventProcessingBolt(AlarmDAO alarmDAO) {
        this.alarmDAO = alarmDAO;
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declareStream(ALARM_EVENT_STREAM_ID, new Fields(ALARM_EVENT_STREAM_FIELDS));
        declarer.declareStream(METRIC_ALARM_EVENT_STREAM_ID, new Fields(METRIC_ALARM_EVENT_STREAM_FIELDS));
        declarer.declareStream(METRIC_SUB_ALARM_EVENT_STREAM_ID, new Fields(METRIC_SUB_ALARM_EVENT_STREAM_FIELDS));
        declarer.declareStream(ALARM_DEFINITION_EVENT_STREAM_ID, new Fields(ALARM_DEFINITION_EVENT_FIELDS));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void execute(Tuple tuple) {
        try {
            Object event = tuple.getValue(0);
            this.logger.trace("Received event for processing {}", event);
            if (event instanceof AlarmDefinitionCreatedEvent) {
                this.handle((AlarmDefinitionCreatedEvent)event);
            } else if (event instanceof AlarmDefinitionUpdatedEvent) {
                this.handle((AlarmDefinitionUpdatedEvent)event);
            } else if (event instanceof AlarmDefinitionDeletedEvent) {
                this.handle((AlarmDefinitionDeletedEvent)event);
            } else if (event instanceof AlarmDeletedEvent) {
                this.handle((AlarmDeletedEvent)event);
            } else if (event instanceof AlarmUpdatedEvent) {
                this.handle((AlarmUpdatedEvent)event);
            }
        }
        catch (Exception e) {
            this.logger.error("Error processing tuple {}", (Object)tuple, (Object)e);
        }
        finally {
            this.collector.ack(tuple);
        }
    }

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
        this.logger = LoggerFactory.getLogger((String)Logging.categoryFor(((Object)((Object)this)).getClass(), context));
        this.logger.info("Preparing");
        this.collector = collector;
        if (this.alarmDAO == null) {
            Injector.registerIfNotBound(AlarmDAO.class, (Module[])new Module[]{new PersistenceModule(this.dbConfig)});
            this.alarmDAO = (AlarmDAO)Injector.getInstance(AlarmDAO.class);
        }
    }

    void handle(AlarmDefinitionCreatedEvent event) {
        this.collector.emit(ALARM_DEFINITION_EVENT_STREAM_ID, (List)new Values(new Object[]{CREATED, event}));
    }

    void handle(AlarmDefinitionDeletedEvent event) {
        this.collector.emit(ALARM_DEFINITION_EVENT_STREAM_ID, (List)new Values(new Object[]{DELETED, event}));
    }

    void handle(AlarmDeletedEvent event) {
        this.logger.debug("Alarm {} deleted", (Object)event.alarmId);
        this.processSubAlarms(DELETED, event.tenantId, event.alarmDefinitionId, event.alarmMetrics, event.subAlarms);
        this.collector.emit(ALARM_EVENT_STREAM_ID, (List)new Values(new Object[]{DELETED, event.alarmId, event}));
    }

    private void processSubAlarms(String command, String tenantId, String alarmDefinitionId, List<MetricDefinition> alarmMetrics, Map<String, AlarmSubExpression> subAlarms) {
        for (MetricDefinition alarmedMetric : alarmMetrics) {
            for (Map.Entry<String, AlarmSubExpression> entry : subAlarms.entrySet()) {
                if (!this.isSuperSet(alarmedMetric, entry.getValue().getMetricDefinition())) continue;
                this.sendSubAlarmMsg(command, entry.getKey(), tenantId, alarmDefinitionId, alarmedMetric);
            }
        }
    }

    private boolean isSuperSet(MetricDefinition toMatch, MetricDefinition match) {
        if (!toMatch.name.equals(match.name)) {
            return false;
        }
        if (match.dimensions == null || match.dimensions.isEmpty()) {
            return true;
        }
        for (Map.Entry entry : toMatch.dimensions.entrySet()) {
            String value = (String)match.dimensions.get(entry.getKey());
            if (value == null || value.equals(entry.getValue())) continue;
            return false;
        }
        return true;
    }

    private void sendSubAlarmMsg(String command, String subAlarmId, String tenantId, String alarmDefinitionId, MetricDefinition metricDef) {
        this.collector.emit(METRIC_ALARM_EVENT_STREAM_ID, (List)new Values(new Object[]{command, new TenantIdAndMetricName(tenantId, metricDef.name), new MetricDefinitionAndTenantId(metricDef, tenantId), alarmDefinitionId, subAlarmId}));
    }

    void handle(AlarmDefinitionUpdatedEvent event) {
        for (Map.Entry entry : event.changedSubExpressions.entrySet()) {
            int updated = this.alarmDAO.updateSubAlarmExpressions((String)entry.getKey(), (AlarmSubExpression)entry.getValue());
            this.logger.info("Updated {} SubAlarms with new AlarmSubExpression {} {}", new Object[]{updated, entry.getKey(), entry.getValue()});
            this.collector.emit(METRIC_SUB_ALARM_EVENT_STREAM_ID, (List)new Values(new Object[]{UPDATED, new SubExpression((String)entry.getKey(), (AlarmSubExpression)entry.getValue()), event.alarmDefinitionId}));
        }
        this.collector.emit(ALARM_DEFINITION_EVENT_STREAM_ID, (List)new Values(new Object[]{UPDATED, event}));
    }

    void handle(AlarmUpdatedEvent event) {
        if (event.oldAlarmState.equals((Object)event.alarmState)) {
            this.logger.info("No state change for {}, ignoring", (Object)event.alarmId);
        }
        this.logger.info("Received AlarmUpdatedEvent {}", (Object)event);
        this.processSubAlarms(RESEND, event.tenantId, event.alarmDefinitionId, event.alarmMetrics, event.subAlarms);
        this.collector.emit(ALARM_EVENT_STREAM_ID, (List)new Values(new Object[]{UPDATED, event.alarmId, event}));
    }
}

