/*
 * Decompiled with CFR 0.152.
 */
package monasca.persister.pipeline.event;

import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import io.dropwizard.setup.Environment;
import monasca.persister.configuration.PipelineConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class FlushableHandler<T> {
    private static final Logger logger = LoggerFactory.getLogger(FlushableHandler.class);
    private final int ordinal;
    private final int batchSize;
    private final String handlerName;
    private long millisSinceLastFlush = System.currentTimeMillis();
    private final long millisBetweenFlushes;
    private final int secondsBetweenFlushes;
    private int eventCount = 0;
    private final Environment environment;
    private final Meter processedMeter;
    private final Meter commitMeter;
    private final Timer commitTimer;

    protected FlushableHandler(PipelineConfiguration configuration, Environment environment, int ordinal, int batchSize, String baseName) {
        this.handlerName = String.format("%s[%d]", baseName, ordinal);
        this.environment = environment;
        this.processedMeter = this.environment.metrics().meter(this.handlerName + "." + "events-processed-processedMeter");
        this.commitMeter = this.environment.metrics().meter(this.handlerName + "." + "commits-executed-processedMeter");
        this.commitTimer = this.environment.metrics().timer(this.handlerName + "." + "total-commit-and-flush-timer");
        this.secondsBetweenFlushes = configuration.getMaxBatchTime();
        this.millisBetweenFlushes = this.secondsBetweenFlushes * 1000;
        this.ordinal = ordinal;
        this.batchSize = batchSize;
    }

    protected abstract void flushRepository();

    protected abstract int process(T var1) throws Exception;

    public boolean onEvent(T event) throws Exception {
        if (event == null) {
            long delta = this.millisSinceLastFlush + this.millisBetweenFlushes;
            logger.debug("{} received heartbeat message, flush every {} seconds.", (Object)this.handlerName, (Object)this.secondsBetweenFlushes);
            if (delta < System.currentTimeMillis()) {
                logger.debug("{}: {} seconds since last flush. Flushing to repository now.", (Object)this.handlerName, (Object)delta);
                this.flush();
                return true;
            }
            logger.debug("{}: {} seconds since last flush. No need to flush at this time.", (Object)this.handlerName, (Object)delta);
            return false;
        }
        this.processedMeter.mark();
        logger.debug("Ordinal:  Event: {}", (Object)this.ordinal, event);
        this.eventCount += this.process(event);
        if (this.eventCount >= this.batchSize) {
            this.flush();
            return true;
        }
        return false;
    }

    public void flush() {
        if (this.eventCount == 0) {
            logger.debug("{}: Nothing to flush", (Object)this.handlerName);
        }
        Timer.Context context = this.commitTimer.time();
        this.flushRepository();
        context.stop();
        this.commitMeter.mark();
        this.millisSinceLastFlush = System.currentTimeMillis();
        logger.debug("{}: Flushed {} events", (Object)this.handlerName, (Object)this.eventCount);
        this.eventCount = 0;
    }
}

