package org.apache.storm.utils;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/utils/DisruptorQueueBackpressureTest.class */
public class DisruptorQueueBackpressureTest extends TestCase {
    private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueueBackpressureTest.class);
    private static final int MESSAGES = 100;
    private static final int CAPACITY = 128;
    private static final double HIGH_WATERMARK = 0.6d;
    private static final double LOW_WATERMARK = 0.2d;

    /* loaded from: input_file:org/apache/storm/utils/DisruptorQueueBackpressureTest$DisruptorBackpressureCallbackImpl.class */
    class DisruptorBackpressureCallbackImpl implements DisruptorBackpressureCallback {
        public long highWaterMarkCalledPopulation = -1;
        public long lowWaterMarkCalledPopulation = -1;
        DisruptorQueue queue;
        AtomicBoolean throttleOn;
        AtomicLong consumerCursor;

        public DisruptorBackpressureCallbackImpl(DisruptorQueue disruptorQueue, AtomicBoolean atomicBoolean, AtomicLong atomicLong) {
            this.queue = disruptorQueue;
            this.throttleOn = atomicBoolean;
            this.consumerCursor = atomicLong;
        }

        public void highWaterMark() throws Exception {
            if (this.throttleOn.get()) {
                return;
            }
            this.highWaterMarkCalledPopulation = this.queue.getMetrics().population() + this.queue.getMetrics().overflow();
            this.throttleOn.set(true);
        }

        public void lowWaterMark() throws Exception {
            if (this.throttleOn.get()) {
                this.lowWaterMarkCalledPopulation = (this.queue.getMetrics().writePos() - this.consumerCursor.get()) + this.queue.getMetrics().overflow();
                this.throttleOn.set(false);
            }
        }
    }

    @Test
    public void testBackPressureCallback() throws Exception {
        DisruptorQueue createQueue = createQueue("testBackPressure", CAPACITY);
        createQueue.setEnableBackpressure(true);
        createQueue.setHighWaterMark(HIGH_WATERMARK);
        createQueue.setLowWaterMark(LOW_WATERMARK);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final AtomicLong atomicLong = new AtomicLong(-1L);
        DisruptorBackpressureCallbackImpl disruptorBackpressureCallbackImpl = new DisruptorBackpressureCallbackImpl(createQueue, atomicBoolean, atomicLong);
        createQueue.registerBackpressureCallback(disruptorBackpressureCallbackImpl);
        for (int i = 0; i < MESSAGES; i++) {
            createQueue.publish(String.valueOf(i));
        }
        createQueue.consumeBatchWhenAvailable(new EventHandler<Object>() { // from class: org.apache.storm.utils.DisruptorQueueBackpressureTest.1
            public void onEvent(Object obj, long j, boolean z) throws Exception {
                atomicLong.set(j);
            }
        });
        Assert.assertEquals("Check the calling time of throttle on. ", createQueue.getHighWaterMark(), disruptorBackpressureCallbackImpl.highWaterMarkCalledPopulation);
        Assert.assertEquals("Checking the calling time of throttle off. ", createQueue.getLowWaterMark(), disruptorBackpressureCallbackImpl.lowWaterMarkCalledPopulation);
    }

    private static DisruptorQueue createQueue(String str, int i) {
        return new DisruptorQueue(str, ProducerType.MULTI, i, 0L, 1, 1L);
    }
}
