/*
 * Decompiled with CFR 0.152.
 */
package monasca.persister.consumer;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import monasca.persister.consumer.KafkaChannel;
import monasca.persister.consumer.KafkaConsumerRunnableBasic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class KafkaConsumer<T> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    private static final int WAIT_TIME = 10;
    private ExecutorService executorService;
    private final KafkaChannel kafkaChannel;
    private final int threadNum;
    private KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic;

    public KafkaConsumer(KafkaChannel kafkaChannel, int threadNum) {
        this.kafkaChannel = kafkaChannel;
        this.threadNum = threadNum;
    }

    protected abstract KafkaConsumerRunnableBasic<T> createRunnable(KafkaChannel var1, int var2);

    public void start() {
        this.executorService = Executors.newFixedThreadPool(1);
        KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic = this.createRunnable(this.kafkaChannel, this.threadNum);
        this.executorService.submit(kafkaConsumerRunnableBasic);
    }

    public void stop() {
        this.kafkaConsumerRunnableBasic.stop();
        if (this.executorService != null) {
            this.executorService.shutdown();
            try {
                if (!this.executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                    logger.warn("Did not shut down in {} seconds", (Object)10);
                }
            }
            catch (InterruptedException e) {
                logger.info("awaitTerminiation interrupted", (Throwable)e);
            }
        }
    }
}

