package org.apache.storm.utils;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/storm/utils/DisruptorQueueTest.class */
public class DisruptorQueueTest extends TestCase {
    private static final int TIMEOUT = 1000;
    private static final int PRODUCER_NUM = 4;

    /* loaded from: input_file:org/apache/storm/utils/DisruptorQueueTest$Consumer.class */
    private static class Consumer implements Runnable {
        private EventHandler handler;
        private DisruptorQueue queue;

        Consumer(DisruptorQueue disruptorQueue, EventHandler eventHandler) {
            this.handler = eventHandler;
            this.queue = disruptorQueue;
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    this.queue.consumeBatchWhenAvailable(this.handler);
                } catch (RuntimeException e) {
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/storm/utils/DisruptorQueueTest$IncProducer.class */
    private static class IncProducer implements Runnable {
        private DisruptorQueue queue;
        private long _max;

        IncProducer(DisruptorQueue disruptorQueue, long j) {
            this.queue = disruptorQueue;
            this._max = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            long j = 0;
            while (true) {
                long j2 = j;
                if (j2 >= this._max || Thread.currentThread().isInterrupted()) {
                    return;
                }
                this.queue.publish(Long.valueOf(j2));
                j = j2 + 1;
            }
        }
    }

    @Test
    public void testFirstMessageFirst() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            DisruptorQueue createQueue = createQueue("firstMessageOrder", 16);
            createQueue.publish("FIRST");
            IncProducer incProducer = new IncProducer(createQueue, i + 100);
            final AtomicReference atomicReference = new AtomicReference();
            run(incProducer, new Consumer(createQueue, new EventHandler<Object>() { // from class: org.apache.storm.utils.DisruptorQueueTest.1
                private boolean head = true;

                public void onEvent(Object obj, long j, boolean z) throws Exception {
                    if (this.head) {
                        this.head = false;
                        atomicReference.set(obj);
                    }
                }
            }), createQueue);
            Assert.assertEquals("We expect to receive first published message first, but received " + atomicReference.get(), "FIRST", atomicReference.get());
        }
    }

    @Test
    public void testInOrder() throws InterruptedException {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        DisruptorQueue createQueue = createQueue("consumerHang", 1024);
        run(new IncProducer(createQueue, 1048576L), new Consumer(createQueue, new EventHandler<Object>() { // from class: org.apache.storm.utils.DisruptorQueueTest.2
            long _expected = 0;

            public void onEvent(Object obj, long j, boolean z) throws Exception {
                if (this._expected != ((Number) obj).longValue()) {
                    atomicBoolean.set(false);
                    System.out.println("Expected " + this._expected + " but got " + obj);
                }
                this._expected++;
            }
        }), createQueue, TIMEOUT, 1);
        Assert.assertTrue("Messages delivered out of order", atomicBoolean.get());
    }

    @Test
    public void testInOrderBatch() throws InterruptedException {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        DisruptorQueue createQueue = createQueue("consumerHang", 10, 1024);
        run(new IncProducer(createQueue, 1048576L), new Consumer(createQueue, new EventHandler<Object>() { // from class: org.apache.storm.utils.DisruptorQueueTest.3
            long _expected = 0;

            public void onEvent(Object obj, long j, boolean z) throws Exception {
                if (this._expected != ((Number) obj).longValue()) {
                    atomicBoolean.set(false);
                    System.out.println("Expected " + this._expected + " but got " + obj);
                }
                this._expected++;
            }
        }), createQueue, TIMEOUT, 1);
        Assert.assertTrue("Messages delivered out of order", atomicBoolean.get());
    }

    private void run(Runnable runnable, Runnable runnable2, DisruptorQueue disruptorQueue) throws InterruptedException {
        run(runnable, runnable2, disruptorQueue, 10, 4);
    }

    private void run(Runnable runnable, Runnable runnable2, DisruptorQueue disruptorQueue, int i, int i2) throws InterruptedException {
        Thread[] threadArr = new Thread[i2];
        for (int i3 = 0; i3 < i2; i3++) {
            threadArr[i3] = new Thread(runnable);
            threadArr[i3].start();
        }
        Thread thread = new Thread(runnable2);
        thread.start();
        Thread.sleep(i);
        for (int i4 = 0; i4 < i2; i4++) {
            threadArr[i4].interrupt();
        }
        for (int i5 = 0; i5 < i2; i5++) {
            threadArr[i5].join(1000L);
            assertFalse("producer " + i5 + " is still alive", threadArr[i5].isAlive());
        }
        disruptorQueue.haltWithInterrupt();
        thread.join(1000L);
        assertFalse("consumer is still alive", thread.isAlive());
    }

    private static DisruptorQueue createQueue(String str, int i) {
        return new DisruptorQueue(str, ProducerType.MULTI, i, 0L, 1, 1L);
    }

    private static DisruptorQueue createQueue(String str, int i, int i2) {
        return new DisruptorQueue(str, ProducerType.MULTI, i2, 0L, i, 1L);
    }
}
