package com.baijia.databus.app;

import com.alibaba.otter.canal.client.CanalConnector;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.MoreObjects;
import java.security.InvalidParameterException;
import java.util.BitSet;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/baijia/databus/app/SlidingWindowBatchManager.class */
public class SlidingWindowBatchManager implements BatchManagement {
    private int windowSize;
    private int timeoutMillSecond;
    private Map<Long, BatchInformation> batchInfomationMap;
    private AtomicLong lastAckedBatchId;
    private AtomicBoolean isRollingback;
    private ReentrantReadWriteLock readWriteLock;
    private ReentrantReadWriteLock.WriteLock writeLock;
    private ReentrantReadWriteLock.ReadLock readLock;
    private ReentrantLock lock;
    private Condition notFull;
    private CanalConnector connector;
    private MetricRegistry metrics;
    private Counter queueSize;
    private Meter inBatchs;
    private Meter outBatchs;
    private Meter outMessage;
    private boolean isMetricsEnabled;
    private static final Logger logger = LoggerFactory.getLogger(SlidingWindowBatchManager.class);
    private static final int DEFAULT_WINDOW_SIZE = 1024;
    private static final int DEFAULT_TIMEOUT_MS = 60000;

    public SlidingWindowBatchManager(CanalConnector canalConnector, int i, int i2) {
        this(canalConnector, i, i2, null, "default");
    }

    public SlidingWindowBatchManager(CanalConnector canalConnector, int i, int i2, MetricRegistry metricRegistry, String str) {
        if (canalConnector == null) {
            throw new NullPointerException("CanalConnector can not be null.");
        }
        this.connector = canalConnector;
        this.windowSize = i > 0 ? i : DEFAULT_WINDOW_SIZE;
        this.timeoutMillSecond = i2 > 0 ? i2 : DEFAULT_TIMEOUT_MS;
        this.batchInfomationMap = new LinkedHashMap(this.windowSize);
        this.lastAckedBatchId = new AtomicLong(-1L);
        this.isRollingback = new AtomicBoolean(false);
        this.readWriteLock = new ReentrantReadWriteLock(false);
        this.readLock = this.readWriteLock.readLock();
        this.writeLock = this.readWriteLock.writeLock();
        this.lock = new ReentrantLock();
        this.notFull = this.lock.newCondition();
        if (metricRegistry == null) {
            this.isMetricsEnabled = false;
            return;
        }
        this.metrics = metricRegistry;
        this.queueSize = this.metrics.counter(String.format("%s.batch.size", str));
        this.inBatchs = this.metrics.meter(String.format("%s.batch.in", str));
        this.outBatchs = this.metrics.meter(String.format("%s.batch.out", str));
        this.outMessage = this.metrics.meter(String.format("%s.message.out", str));
        this.isMetricsEnabled = true;
    }

    @Override // com.baijia.databus.app.BatchManagement
    public void setConnector(CanalConnector canalConnector) {
        this.connector = canalConnector;
    }

    @Override // com.baijia.databus.app.BatchManagement
    public boolean isRollingback() {
        return this.isRollingback.get();
    }

    @Override // com.baijia.databus.app.BatchManagement
    public void checkTimeout() {
        BatchInformation batchInformation = null;
        this.readLock.lock();
        try {
            if (!this.batchInfomationMap.isEmpty()) {
                batchInformation = this.batchInfomationMap.get(this.batchInfomationMap.keySet().iterator().next());
            }
            if (batchInformation == null || System.currentTimeMillis() - batchInformation.getCommitTimestamp().longValue() <= this.timeoutMillSecond) {
                return;
            }
            logger.warn("[Batch-Timeout] Batch {} is committed at {}.", batchInformation.getUuid(), new Date(batchInformation.getCommitTimestamp().longValue()));
        } finally {
            this.readLock.unlock();
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // com.baijia.databus.app.BatchManagement
    public void registerBatch(BatchInformation batchInformation) {
        if (batchInformation.getBatchId() == null || batchInformation.getBatchSize() == null) {
            throw new NullPointerException("batchId or batchSize can not be null.");
        }
        long longValue = batchInformation.getBatchId().longValue();
        if (longValue < 0 || batchInformation.getBatchSize().intValue() < 0) {
            logger.warn("batchId {}, batchSize {} will be ignored.", Long.valueOf(longValue), batchInformation.getBatchSize());
            return;
        }
        if (batchInformation.getCommitTimestamp() == null) {
            batchInformation.setCommitTimestamp(Long.valueOf(System.currentTimeMillis()));
        }
        if (batchInformation.getProcessStatus() == null) {
            BitSet bitSet = new BitSet(batchInformation.getBatchSize().intValue());
            bitSet.set(0, batchInformation.getBatchSize().intValue());
            batchInformation.setProcessStatus(bitSet);
        }
        if (batchInformation.getFailedStatus() == null) {
            batchInformation.setFailedStatus(new BitSet(batchInformation.getBatchSize().intValue()));
        }
        this.lock.lock();
        try {
            try {
                if (this.batchInfomationMap.size() >= this.windowSize) {
                    this.readLock.lock();
                    try {
                        Iterator<Long> it = this.batchInfomationMap.keySet().iterator();
                        if (it.hasNext()) {
                            BatchInformation batchInformation2 = this.batchInfomationMap.get(it.next());
                            logger.info("First unacked batch {},  batchFailedMap: {}, batchUnderProcessMap: {}", new Object[]{batchInformation2.getUuid(), ArrayUtils.toString(((BitSet) MoreObjects.firstNonNull(batchInformation2.getFailedStatus(), new BitSet())).toLongArray()), ArrayUtils.toString(((BitSet) MoreObjects.firstNonNull(batchInformation2.getProcessStatus(), new BitSet())).toLongArray())});
                        }
                        this.readLock.unlock();
                        this.notFull.await();
                    } catch (Throwable th) {
                        this.readLock.unlock();
                        throw th;
                    }
                }
                this.writeLock.lock();
                this.batchInfomationMap.put(batchInformation.getBatchId(), batchInformation);
                this.writeLock.unlock();
                if (this.isMetricsEnabled) {
                    this.queueSize.inc();
                    this.inBatchs.mark();
                }
                this.lock.unlock();
            } catch (InterruptedException e) {
                logger.warn("Be interrupted.");
                this.lock.unlock();
            }
        } catch (Throwable th2) {
            this.lock.unlock();
            throw th2;
        }
    }

    @Override // com.baijia.databus.app.BatchManagement
    public void registerBatch(long j, int i, String str) {
        if (j < 0 || i <= 0) {
            logger.warn("batchId {}, batchSize {} will be ignored.", Long.valueOf(j), Integer.valueOf(i));
            return;
        }
        BatchInformation batchInformation = new BatchInformation();
        batchInformation.setBatchId(Long.valueOf(j));
        batchInformation.setBatchSize(Integer.valueOf(i));
        batchInformation.setUuid(str);
        registerBatch(batchInformation);
    }

    @Override // com.baijia.databus.app.BatchManagement
    public void setProcessStatus(long j, int i, boolean z) {
        if (j < 0 || i < 0) {
            throw new InvalidParameterException(String.format("batchId or batchIndex is invalid, batchId: %d, batchIndex: %d", Long.valueOf(j), Integer.valueOf(i)));
        }
        BitSet bitSet = null;
        BitSet bitSet2 = null;
        if (!this.batchInfomationMap.containsKey(Long.valueOf(j))) {
            logger.warn("batch information map does not contains key {}, will pass it.", Integer.valueOf(i));
            return;
        }
        this.readLock.lock();
        try {
            if (this.batchInfomationMap.containsKey(Long.valueOf(j))) {
                logger.debug("set process status, batchId: {}, batchIndex: {}, finished: {}", new Object[]{Long.valueOf(j), Integer.valueOf(i), Boolean.valueOf(z)});
                bitSet = this.batchInfomationMap.get(Long.valueOf(j)).getProcessStatus();
                bitSet2 = this.batchInfomationMap.get(Long.valueOf(j)).getFailedStatus();
                if (bitSet == null || bitSet2 == null) {
                    logger.warn("batch information does not contains processStatus or failedStatus.");
                    this.readLock.unlock();
                    return;
                }
            }
            this.readLock.unlock();
            if (z) {
                synchronized (bitSet) {
                    bitSet.set(i, false);
                }
                if (bitSet.cardinality() == 0) {
                    setProcessStatus(j, true);
                }
            } else {
                synchronized (bitSet) {
                    bitSet.set(i, true);
                }
                synchronized (bitSet2) {
                    bitSet2.set(i, true);
                }
                setProcessStatus(j, false);
                this.isRollingback.set(true);
            }
            if (this.isMetricsEnabled) {
                this.outMessage.mark();
            }
        } catch (Throwable th) {
            this.readLock.unlock();
            throw th;
        }
    }

    @Override // com.baijia.databus.app.BatchManagement
    public boolean isProcessed(long j) {
        this.readLock.lock();
        try {
            return this.batchInfomationMap.containsKey(Long.valueOf(j)) ? this.batchInfomationMap.get(Long.valueOf(j)).getIsProcessed() : true;
        } finally {
            this.readLock.unlock();
        }
    }

    @Override // com.baijia.databus.app.BatchManagement
    public boolean isProcessed(long j, int i) {
        boolean z;
        this.readLock.lock();
        try {
            if (this.batchInfomationMap.containsKey(Long.valueOf(j)) && this.batchInfomationMap.get(Long.valueOf(j)).getProcessStatus().length() > i) {
                if (this.batchInfomationMap.get(Long.valueOf(j)).getProcessStatus().get(i)) {
                    z = false;
                    return z;
                }
            }
            z = true;
            return z;
        } finally {
            this.readLock.unlock();
        }
    }

    private void setProcessStatus(long j, boolean z) {
        this.readLock.lock();
        if (j >= 0) {
            try {
                if (this.batchInfomationMap.containsKey(Long.valueOf(j))) {
                    this.batchInfomationMap.get(Long.valueOf(j)).setIsProcessed(z);
                }
            } finally {
                this.readLock.unlock();
            }
        }
    }

    @Override // com.baijia.databus.app.BatchManagement
    public boolean tryAck(long j) {
        if (this.connector == null) {
            throw new NullPointerException("canal connector can not be null.");
        }
        logger.debug("Processing batchId  {}", Long.valueOf(j));
        if (j < 0) {
            ensureProcessedBeAcked();
            return false;
        }
        if (j == this.lastAckedBatchId.get()) {
            return false;
        }
        if (j < this.lastAckedBatchId.get()) {
            logger.warn("tryAck sequence out of order: batchId {},  ackedBatchId {}", Long.valueOf(j), Long.valueOf(this.lastAckedBatchId.get()));
        }
        ensureProcessedBeAcked();
        return this.lastAckedBatchId.get() == j;
    }

    private void ensureProcessedBeAcked() {
        if (this.batchInfomationMap.isEmpty()) {
            return;
        }
        BatchInformation batchInformation = null;
        this.readLock.lock();
        try {
            if (!this.batchInfomationMap.isEmpty()) {
                batchInformation = this.batchInfomationMap.get(Long.valueOf(this.batchInfomationMap.keySet().iterator().next().longValue()));
            }
            while (batchInformation != null) {
                logger.debug("Preparing tryAck batchId {}", batchInformation.getBatchId());
                if (!batchInformation.getIsProcessed() || !this.batchInfomationMap.containsKey(batchInformation.getBatchId())) {
                    logger.debug("uuid: {}, bitset: {}", MoreObjects.firstNonNull(batchInformation.getUuid(), ""), ArrayUtils.toString(batchInformation.getProcessStatus().toLongArray()));
                    return;
                }
                BatchInformation batchInformation2 = null;
                this.lock.lock();
                try {
                    this.writeLock.lock();
                    try {
                        if (this.batchInfomationMap.containsKey(batchInformation.getBatchId())) {
                            ensureCanalConnectorActive();
                            this.connector.ack(batchInformation.getBatchId().longValue());
                            batchInformation2 = this.batchInfomationMap.remove(batchInformation.getBatchId());
                            if (this.isMetricsEnabled) {
                                this.queueSize.dec();
                                this.outBatchs.mark();
                            }
                        }
                        this.writeLock.unlock();
                        if (batchInformation2 != null) {
                            this.notFull.signalAll();
                            logger.debug("batchId {} is acked.", batchInformation.getBatchId());
                            this.lastAckedBatchId.set(batchInformation.getBatchId().longValue());
                        }
                        this.readLock.lock();
                        try {
                            Set<Long> keySet = this.batchInfomationMap.keySet();
                            batchInformation = keySet.isEmpty() ? null : this.batchInfomationMap.get(keySet.iterator().next());
                            this.readLock.unlock();
                        } finally {
                        }
                    } catch (Throwable th) {
                        this.writeLock.unlock();
                        throw th;
                    }
                } finally {
                    this.lock.unlock();
                }
            }
        } finally {
            this.readLock.unlock();
        }
    }

    private void ensureCanalConnectorActive() {
        if (this.connector == null) {
            throw new NullPointerException("CanalConnector can not be null");
        }
        if (this.connector.checkValid()) {
            return;
        }
        this.connector.connect();
        this.connector.subscribe();
    }

    private void rollback() {
        if (this.batchInfomationMap.isEmpty()) {
            return;
        }
        this.readLock.lock();
        try {
            logger.warn("Preparing rollback {}.", this.batchInfomationMap.get(this.batchInfomationMap.keySet().iterator().next()).getUuid());
            this.lock.lock();
            try {
                this.writeLock.lock();
                try {
                    this.connector.rollback();
                    if (this.isMetricsEnabled) {
                        this.queueSize.dec(this.batchInfomationMap.size());
                    }
                    this.batchInfomationMap.clear();
                    this.writeLock.unlock();
                    this.notFull.signalAll();
                    this.isRollingback.set(false);
                } catch (Throwable th) {
                    this.writeLock.unlock();
                    throw th;
                }
            } finally {
                this.lock.unlock();
            }
        } finally {
            this.readLock.unlock();
        }
    }

    @Override // com.baijia.databus.app.BatchManagement
    public void waitForRollbackIfNecessary() throws InterruptedException {
        if (this.isRollingback.get()) {
            logger.debug("Waitting for rollback: batchInformationMap: {}", this.batchInfomationMap);
            boolean z = false;
            while (!z) {
                z = true;
                this.readLock.lock();
                try {
                    Iterator<Long> it = this.batchInfomationMap.keySet().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        long longValue = it.next().longValue();
                        BatchInformation batchInformation = this.batchInfomationMap.get(Long.valueOf(longValue));
                        if (!batchInformation.getIsProcessed() && batchInformation.getProcessStatus().cardinality() != batchInformation.getFailedStatus().cardinality()) {
                            logger.info("[waiting for {} rollback] batchFailedMap: {}, batchUnderProcessMap: {}", new Object[]{this.batchInfomationMap.get(Long.valueOf(longValue)).getUuid(), ArrayUtils.toString(((BitSet) MoreObjects.firstNonNull(this.batchInfomationMap.get(Long.valueOf(longValue)).getFailedStatus(), new BitSet())).toLongArray()), ArrayUtils.toString(((BitSet) MoreObjects.firstNonNull(this.batchInfomationMap.get(Long.valueOf(longValue)).getProcessStatus(), new BitSet())).toLongArray())});
                            z = false;
                            break;
                        }
                    }
                    if (!z) {
                        TimeUnit.MILLISECONDS.sleep(100L);
                    }
                } finally {
                    this.readLock.unlock();
                }
            }
            if (this.batchInfomationMap.isEmpty()) {
                return;
            }
            rollback();
        }
    }
}
