/*
 * Decompiled with CFR 0.152.
 */
package com.taosdata.jdbc.tmq;

import com.taosdata.jdbc.common.Consumer;
import com.taosdata.jdbc.enums.TmqMessageType;
import com.taosdata.jdbc.tmq.Assignment;
import com.taosdata.jdbc.tmq.ConsumerRecord;
import com.taosdata.jdbc.tmq.ConsumerRecords;
import com.taosdata.jdbc.tmq.Deserializer;
import com.taosdata.jdbc.tmq.OffsetAndMetadata;
import com.taosdata.jdbc.tmq.OffsetCommitCallback;
import com.taosdata.jdbc.tmq.OffsetWaitCallback;
import com.taosdata.jdbc.tmq.TMQConnector;
import com.taosdata.jdbc.tmq.TMQResultSet;
import com.taosdata.jdbc.tmq.TopicPartition;
import com.taosdata.jdbc.utils.StringUtils;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

public class JNIConsumer<V>
implements Consumer<V> {
    private final TMQConnector connector = new TMQConnector();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void create(Properties properties) throws SQLException {
        if (!StringUtils.isEmpty(properties.getProperty("td.connect.ip")) && properties.getProperty("td.connect.ip").startsWith("[")) {
            String ip = properties.getProperty("td.connect.ip");
            if (ip.endsWith("]")) {
                ip = ip.substring(1, ip.length() - 1);
            }
            properties.setProperty("td.connect.ip", ip);
        }
        long config = this.connector.createConfig(properties);
        try {
            this.connector.createConsumer(config);
        }
        finally {
            this.connector.destroyConf(config);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void subscribe(Collection<String> topics) throws SQLException {
        long topicPointer = 0L;
        try {
            topicPointer = this.connector.createTopic(topics);
            this.connector.subscribe(topicPointer);
        }
        finally {
            if (topicPointer != 0L) {
                this.connector.destroyTopic(topicPointer);
            }
        }
    }

    @Override
    public void unsubscribe() throws SQLException {
        this.connector.unsubscribe();
    }

    @Override
    public Set<String> subscription() throws SQLException {
        return this.connector.subscription();
    }

    @Override
    public ConsumerRecords<V> poll(Duration timeout, Deserializer<V> deserializer) throws SQLException {
        long resultSet = this.connector.poll(timeout.toMillis());
        if (resultSet == 0L || resultSet == 9079L) {
            return ConsumerRecords.emptyRecord();
        }
        int timestampPrecision = this.connector.getResultTimePrecision(resultSet);
        ConsumerRecords records = new ConsumerRecords();
        String topic = this.connector.getTopicName(resultSet);
        String dbName = this.connector.getDbName(resultSet);
        int vGroupId = this.connector.getVgroupId(resultSet);
        long offset = this.connector.getOffset(resultSet);
        String tableName = this.connector.getTableName(resultSet);
        TopicPartition tp = new TopicPartition(topic, vGroupId);
        try (TMQResultSet rs = new TMQResultSet(this.connector, resultSet, timestampPrecision, dbName, tableName);){
            while (rs.next()) {
                V v = deserializer.deserialize(rs, topic, dbName);
                ConsumerRecord r = new ConsumerRecord.Builder().topic(topic).dbName(dbName).vGroupId(vGroupId).offset(offset).messageType(TmqMessageType.TMQ_RES_DATA).value(v).build();
                records.put(tp, r);
            }
        }
        return records;
    }

    @Override
    public void commitAsync(OffsetCommitCallback<V> callback) throws SQLException {
        OffsetWaitCallback<V> call = new OffsetWaitCallback<V>(this.getAllConsumed(), this, callback);
        this.connector.asyncCommit(call);
    }

    private Map<TopicPartition, OffsetAndMetadata> getAllConsumed() throws SQLException {
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        this.subscription().forEach(topic -> {
            List<Assignment> topicAssignment = this.connector.getTopicAssignment((String)topic);
            topicAssignment.forEach(assignment -> {
                TopicPartition tp = new TopicPartition((String)topic, assignment.getVgId());
                OffsetAndMetadata metadata = new OffsetAndMetadata(assignment.getCurrentOffset());
                offsets.put(tp, metadata);
            });
        });
        return offsets;
    }

    @Override
    public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback<V> callback) {
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            HashMap<TopicPartition, OffsetAndMetadata> offset = new HashMap<TopicPartition, OffsetAndMetadata>();
            offset.put(entry.getKey(), entry.getValue());
            this.connector.asyncCommit(entry.getKey().getTopic(), entry.getKey().getVGroupId(), entry.getValue().offset(), new OffsetWaitCallback<V>(offset, this, callback));
        }
    }

    @Override
    public void commitSync() throws SQLException {
        this.connector.syncCommit();
    }

    @Override
    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) throws SQLException {
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            this.connector.commitOffsetSync(entry.getKey().getTopic(), entry.getKey().getVGroupId(), entry.getValue().offset());
        }
    }

    @Override
    public void seek(TopicPartition partition, long offset) {
        this.connector.seek(partition.getTopic(), partition.getVGroupId(), offset);
    }

    @Override
    public long position(TopicPartition partition) throws SQLException {
        return this.connector.position(partition.getTopic(), partition.getVGroupId());
    }

    @Override
    public Map<TopicPartition, Long> position(String topic) throws SQLException {
        List collect = this.connector.getTopicAssignment(topic).stream().map(a -> new TopicPartition(topic, a.getVgId())).collect(Collectors.toList());
        HashMap<TopicPartition, Long> map = new HashMap<TopicPartition, Long>();
        for (TopicPartition topicPartition : collect) {
            map.put(topicPartition, this.position(topicPartition));
        }
        return map;
    }

    @Override
    public Map<TopicPartition, Long> beginningOffsets(String topic) {
        return this.connector.getTopicAssignment(topic).stream().collect(HashMap::new, (m, a) -> m.put(new TopicPartition(topic, a.getVgId()), a.getBegin()), HashMap::putAll);
    }

    @Override
    public Map<TopicPartition, Long> endOffsets(String topic) {
        return this.connector.getTopicAssignment(topic).stream().collect(HashMap::new, (m, a) -> m.put(new TopicPartition(topic, a.getVgId()), a.getEnd()), HashMap::putAll);
    }

    @Override
    public void seekToBeginning(Collection<TopicPartition> partitions) throws SQLException {
        HashMap beginningOffsets = new HashMap();
        for (TopicPartition partition : partitions) {
            if (beginningOffsets.containsKey(partition)) {
                Long aLong = (Long)beginningOffsets.get(partition);
                this.seek(partition, aLong);
                continue;
            }
            this.beginningOffsets(partition.getTopic()).forEach((tp, offset) -> {
                if (tp.getVGroupId() == partition.getVGroupId()) {
                    this.seek((TopicPartition)tp, (long)offset);
                } else {
                    beginningOffsets.put(tp, offset);
                }
            });
        }
    }

    @Override
    public void seekToEnd(Collection<TopicPartition> partitions) throws SQLException {
        HashMap endOffsets = new HashMap();
        for (TopicPartition partition : partitions) {
            if (endOffsets.containsKey(partition)) {
                Long aLong = (Long)endOffsets.get(partition);
                this.seek(partition, aLong);
                continue;
            }
            this.endOffsets(partition.getTopic()).forEach((tp, offset) -> {
                if (tp.getVGroupId() == partition.getVGroupId()) {
                    this.seek((TopicPartition)tp, (long)offset);
                } else {
                    endOffsets.put(tp, offset);
                }
            });
        }
    }

    @Override
    public Set<TopicPartition> assignment() throws SQLException {
        return this.subscription().stream().map(topic -> {
            List<Assignment> topicAssignment = this.connector.getTopicAssignment((String)topic);
            return topicAssignment.stream().map(a -> new TopicPartition((String)topic, a.getVgId())).collect(Collectors.toList());
        }).flatMap(Collection::stream).collect(Collectors.toSet());
    }

    @Override
    public OffsetAndMetadata committed(TopicPartition partition) throws SQLException {
        long l = this.connector.committed(partition.getTopic(), partition.getVGroupId());
        return new OffsetAndMetadata(l, null);
    }

    @Override
    public Map<TopicPartition, OffsetAndMetadata> committed(Set<TopicPartition> partitions) throws SQLException {
        HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (TopicPartition partition : partitions) {
            map.put(partition, this.committed(partition));
        }
        return map;
    }

    @Override
    public void close() throws SQLException {
        this.connector.closeConsumer();
    }

    public String getErrMsg(int code) {
        return this.connector.getErrMsg(code);
    }
}

