package cn.hangar.agp.module.mq.kafka;

import cn.hangar.agp.platform.core.app.SysException;
import cn.hangar.agp.platform.core.log.Logger;
import cn.hangar.agp.platform.core.mq.BaseMQManager;
import cn.hangar.agp.platform.core.mq.CallBack;
import cn.hangar.agp.platform.core.mq.Consumer;
import cn.hangar.agp.platform.core.mq.Producer;
import cn.hangar.agp.platform.utils.CollectionUtil;
import cn.hangar.agp.platform.utils.KeyValue;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:cn/hangar/agp/module/mq/kafka/KafkaMQManager.class */
public class KafkaMQManager<T> extends BaseMQManager<KeyValue<String, T>> {

    /* loaded from: input_file:cn/hangar/agp/module/mq/kafka/KafkaMQManager$KafkaConsumer.class */
    class KafkaConsumer implements Consumer {
        private static final int _timeout = 10000;
        Map configs;
        org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer;
        KafkaMQManager<T>.KafkaConsumer.SubscribePoolManager subscribePoolManager;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:cn/hangar/agp/module/mq/kafka/KafkaMQManager$KafkaConsumer$SubscribePoolManager.class */
        public class SubscribePoolManager {
            Consumer consumer;
            Map<String, List<CallBack>> topics;
            Thread thread;
            private static final int CORE_POOL_SIZE = 4;
            private static final int MAX_POOL_SIZE = 10;
            private static final int KEEP_ALIVE_TIME = 0;
            private static final int WORK_QUEUE_SIZE = 10;
            final ThreadPoolExecutor executor;
            final RejectedExecutionHandler handler;
            boolean interrupt = false;
            final Runnable subscribeThread = new Runnable() { // from class: cn.hangar.agp.module.mq.kafka.KafkaMQManager.KafkaConsumer.SubscribePoolManager.2
                @Override // java.lang.Runnable
                public void run() {
                    while (!SubscribePoolManager.this.interrupt) {
                        if (SubscribePoolManager.this.topics != null) {
                            for (Map.Entry entry : new HashMap(SubscribePoolManager.this.topics).entrySet()) {
                                List poll = SubscribePoolManager.this.consumer.poll((String) entry.getKey(), 200L);
                                if (poll != null && !poll.isEmpty()) {
                                    Iterator it = ((List) entry.getValue()).iterator();
                                    while (it.hasNext()) {
                                        SubscribePoolManager.this.executor.execute(new QueryThread(poll, (CallBack) it.next()));
                                    }
                                }
                            }
                        }
                    }
                }
            };

            /* loaded from: input_file:cn/hangar/agp/module/mq/kafka/KafkaMQManager$KafkaConsumer$SubscribePoolManager$QueryThread.class */
            class QueryThread implements Runnable {
                Object qe;
                CallBack call;

                public QueryThread(Object obj, CallBack callBack) {
                    this.qe = obj;
                    this.call = callBack;
                }

                @Override // java.lang.Runnable
                public void run() {
                    try {
                        this.call.call(this.qe);
                    } catch (Exception e) {
                        this.call.failure(e);
                    }
                }
            }

            public SubscribePoolManager(Consumer consumer) {
                this.handler = new RejectedExecutionHandler() { // from class: cn.hangar.agp.module.mq.kafka.KafkaMQManager.KafkaConsumer.SubscribePoolManager.1
                    @Override // java.util.concurrent.RejectedExecutionHandler
                    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                    }
                };
                this.executor = new ThreadPoolExecutor(CORE_POOL_SIZE, 10, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue(10), this.handler);
                this.consumer = consumer;
            }

            public void close() {
                this.thread.interrupt();
                this.interrupt = true;
            }

            public void start() {
                if (this.thread == null || !this.thread.isAlive() || this.thread.isInterrupted()) {
                    this.thread = new Thread(this.subscribeThread, "kafka-subscribe");
                    this.thread.start();
                    this.interrupt = false;
                }
            }

            public void addCall(String str, CallBack callBack) {
                if (this.topics == null) {
                    this.topics = new ConcurrentHashMap();
                }
                if (!this.topics.containsKey(str)) {
                    this.topics.put(str, new ArrayList());
                }
                synchronized (this) {
                    if (!this.topics.get(str).contains(callBack)) {
                        this.topics.get(str).add(callBack);
                    }
                }
                if (this.thread == null || !this.thread.isAlive()) {
                    this.thread = new Thread(this.subscribeThread, "kafka-subscribe");
                    this.thread.start();
                }
            }

            public void delCall(String str) {
                this.topics.remove(str);
            }

            public void delCall(String str, CallBack callBack) {
                if (this.topics.containsKey(str)) {
                    this.topics.get(str).remove(callBack);
                }
            }
        }

        public KafkaConsumer(Map map) {
            this.consumer = KafkaConnectionFactory.createKafkaConsumer(map);
            this.configs = map;
        }

        public List poll(String str, long j) {
            if (str != null) {
                this.consumer.subscribe(CollectionUtil.list(new String[]{str}));
            }
            ConsumerRecords poll = this.consumer.poll(j);
            ArrayList arrayList = new ArrayList();
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                arrayList.add(new KeyValue(consumerRecord.key(), consumerRecord.value()));
            }
            return arrayList;
        }

        public void poll(String str, CallBack callBack) {
            if (str != null) {
                this.consumer.subscribe(CollectionUtil.list(new String[]{str}));
            }
            ConsumerRecords poll = this.consumer.poll(10000L);
            ArrayList arrayList = new ArrayList();
            Iterator it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                arrayList.add(new KeyValue(consumerRecord.key(), consumerRecord.value()));
            }
            try {
                callBack.call(arrayList);
            } catch (Exception e) {
                callBack.failure(e);
            }
        }

        public void subscribe(List<String> list, CallBack callBack) {
            if (this.subscribePoolManager == null) {
                this.subscribePoolManager = new SubscribePoolManager(this);
            }
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                this.subscribePoolManager.addCall(it.next(), callBack);
            }
        }

        public void subscribe(String str, CallBack callBack) {
            if (this.subscribePoolManager == null) {
                this.subscribePoolManager = new SubscribePoolManager(this);
            }
            this.subscribePoolManager.addCall(str, callBack);
        }

        public boolean unSubscribe(String str, CallBack callBack) {
            if (this.subscribePoolManager == null) {
                return true;
            }
            this.subscribePoolManager.delCall(str, callBack);
            return true;
        }

        public boolean unSubscribe(String str) {
            if (this.subscribePoolManager == null) {
                return true;
            }
            this.subscribePoolManager.delCall(str);
            return true;
        }

        public boolean isRunning() {
            return this.consumer != null;
        }

        public void close() throws Exception {
            this.consumer.close();
            this.consumer = null;
            this.subscribePoolManager.close();
        }

        public void start() {
            if (this.consumer != null) {
                this.consumer = KafkaConnectionFactory.createKafkaConsumer(this.configs);
                this.subscribePoolManager.start();
            }
        }

        public void commit() {
            this.consumer.commitAsync();
        }

        public void subscribe(List<String> list) {
            this.consumer.subscribe(list);
        }

        public boolean unSubscribe() {
            this.consumer.unsubscribe();
            return true;
        }

        public List poll(long j) {
            return poll((String) null, j);
        }
    }

    /* loaded from: input_file:cn/hangar/agp/module/mq/kafka/KafkaMQManager$KafkaProducer.class */
    class KafkaProducer implements Producer<KeyValue<String, T>> {
        org.apache.kafka.clients.producer.Producer<String, T> producer;
        Map configs;

        public KafkaProducer(Map map) {
            this.producer = KafkaConnectionFactory.createProducer(map);
            this.configs = map;
        }

        public void send(String str, KeyValue<String, T> keyValue) {
            this.producer.send(new ProducerRecord(str, keyValue.getKey(), keyValue.getValue()));
        }

        public void send(String str, KeyValue<String, T> keyValue, Map<String, byte[]> map, CallBack callBack) {
            this.producer.send(new ProducerRecord(str, (Integer) null, keyValue.getKey(), keyValue.getValue()), (recordMetadata, exc) -> {
                if (exc == null) {
                    if (callBack != null) {
                        callBack.call(recordMetadata);
                    }
                } else if (callBack != null) {
                    callBack.failure(exc);
                } else {
                    Logger.warn(getClass(), "Producer template failure: {}", new Object[]{exc.getMessage(), exc});
                }
            });
        }

        public void close() throws Exception {
            this.producer.close();
            this.producer = null;
        }

        public void start() {
            if (this.producer == null) {
                this.producer = KafkaConnectionFactory.createProducer(this.configs);
            }
        }

        public boolean isRunning() {
            return this.producer != null;
        }

        public /* bridge */ /* synthetic */ void send(String str, Object obj, Map map, CallBack callBack) {
            send(str, (KeyValue) obj, (Map<String, byte[]>) map, callBack);
        }
    }

    public void subscribe(List<String> list, CallBack callBack) {
        ((KafkaConsumer) getConsumer()).subscribe(list, callBack);
    }

    public Boolean isEmpty(String str) {
        throw SysException.unsupportedProvd();
    }

    public Boolean isEmpty() {
        throw SysException.unsupportedProvd();
    }

    public Producer<KeyValue<String, T>> createProducer(Map map) {
        if (map == null) {
            return null;
        }
        return new KafkaProducer(map);
    }

    public Consumer createConsumer(Map map) {
        if (map == null) {
            return null;
        }
        return new KafkaConsumer(map);
    }
}
