/*
 * Decompiled with CFR 0.152.
 */
package monasca.persister.consumer;

import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import monasca.persister.configuration.KafkaConfiguration;
import monasca.persister.configuration.MonPersisterConfiguration;
import monasca.persister.configuration.PipelineConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaChannel {
    private static final String KAFKA_CONFIGURATION = "Kafka configuration:";
    private static final Logger logger = LoggerFactory.getLogger(KafkaChannel.class);
    private final String topic;
    private final ConsumerConnector consumerConnector;
    private final int threadNum;

    @Inject
    public KafkaChannel(@Assisted MonPersisterConfiguration configuration, @Assisted PipelineConfiguration pipelineConfiguration, @Assisted int threadNum) {
        this.topic = pipelineConfiguration.getTopic();
        this.threadNum = threadNum;
        Properties kafkaProperties = this.createKafkaProperties(configuration.getKafkaConfiguration(), pipelineConfiguration);
        this.consumerConnector = Consumer.createJavaConsumerConnector((ConsumerConfig)this.createConsumerConfig(kafkaProperties));
    }

    public final void markRead() {
        this.consumerConnector.commitOffsets();
    }

    public KafkaStream<byte[], byte[]> getKafkaStream() {
        HashMap<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(this.topic, 1);
        Map streamMap = this.consumerConnector.createMessageStreams(topicCountMap);
        List streams = (List)streamMap.values().iterator().next();
        if (streams.size() != 1) {
            throw new IllegalStateException(String.format("Expected only one stream but instead there are %d", streams.size()));
        }
        return (KafkaStream)streams.get(0);
    }

    public void stop() {
        this.consumerConnector.shutdown();
    }

    private ConsumerConfig createConsumerConfig(Properties kafkaProperties) {
        return new ConsumerConfig(kafkaProperties);
    }

    private Properties createKafkaProperties(KafkaConfiguration kafkaConfiguration, PipelineConfiguration pipelineConfiguration) {
        Properties properties = new Properties();
        properties.put("group.id", pipelineConfiguration.getGroupId());
        properties.put("zookeeper.connect", kafkaConfiguration.getZookeeperConnect());
        properties.put("consumer.id", String.format("%s_%d", pipelineConfiguration.getConsumerId(), this.threadNum));
        properties.put("socket.timeout.ms", kafkaConfiguration.getSocketTimeoutMs().toString());
        properties.put("socket.receive.buffer.bytes", kafkaConfiguration.getSocketReceiveBufferBytes().toString());
        properties.put("fetch.message.max.bytes", kafkaConfiguration.getFetchMessageMaxBytes().toString());
        properties.put("auto.commit.enable", "false");
        properties.put("queued.max.message.chunks", kafkaConfiguration.getQueuedMaxMessageChunks().toString());
        properties.put("rebalance.max.retries", kafkaConfiguration.getRebalanceMaxRetries().toString());
        properties.put("fetch.min.bytes", kafkaConfiguration.getFetchMinBytes().toString());
        properties.put("fetch.wait.max.ms", kafkaConfiguration.getFetchWaitMaxMs().toString());
        properties.put("rebalance.backoff.ms", kafkaConfiguration.getRebalanceBackoffMs().toString());
        properties.put("refresh.leader.backoff.ms", kafkaConfiguration.getRefreshLeaderBackoffMs().toString());
        properties.put("auto.offset.reset", kafkaConfiguration.getAutoOffsetReset());
        properties.put("consumer.timeout.ms", kafkaConfiguration.getConsumerTimeoutMs().toString());
        properties.put("client.id", String.format("%s_%d", pipelineConfiguration.getClientId(), this.threadNum));
        properties.put("zookeeper.session.timeout.ms", kafkaConfiguration.getZookeeperSessionTimeoutMs().toString());
        properties.put("zookeeper.connection.timeout.ms", kafkaConfiguration.getZookeeperConnectionTimeoutMs().toString());
        properties.put("zookeeper.sync.time.ms", kafkaConfiguration.getZookeeperSyncTimeMs().toString());
        for (String key : properties.stringPropertyNames()) {
            logger.info("Kafka configuration: " + key + " = " + properties.getProperty(key));
        }
        return properties;
    }
}

