package com.baijia.databus;

import java.beans.ConstructorProperties;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/baijia/databus/AbstractProcessor.class */
public abstract class AbstractProcessor implements Processor {
    private static final Logger log = LoggerFactory.getLogger(AbstractProcessor.class);
    private static ExecutorService executorService = Executors.newCachedThreadPool();
    private volatile AtomicInteger retry = new AtomicInteger(0);

    /* loaded from: input_file:com/baijia/databus/AbstractProcessor$ProcessResult.class */
    public static class ProcessResult {
        public boolean success;
        public Processor processor;

        @ConstructorProperties({"success", "processor"})
        public ProcessResult(boolean z, Processor processor) {
            this.success = z;
            this.processor = processor;
        }
    }

    @Override // com.baijia.databus.Processor
    public int maxRetry() {
        return 3;
    }

    @Override // com.baijia.databus.Processor
    public Future<ProcessResult> onChanged(List<ChangedRow> list, boolean z, Processor processor) {
        if (z) {
            this.retry.set(0);
        } else if (this.retry.incrementAndGet() > maxRetry()) {
            log.error("[Kafke processor danger] processor {} has exceed the max retry time {} of process messages {}", new Object[]{this, Integer.valueOf(maxRetry()), list});
            return null;
        }
        List list2 = (List) list.parallelStream().map((v0) -> {
            return v0.basicInfo();
        }).collect(Collectors.toList());
        List list3 = (List) list.stream().filter(changedRow -> {
            return filter(changedRow);
        }).collect(Collectors.toList());
        log.info("Changed row infos {}, filtered result {}, processor {}", new Object[]{list2, list3.parallelStream().map((v0) -> {
            return v0.basicInfo();
        }).collect(Collectors.toList()), getClass().getName()});
        return list3.isEmpty() ? CompletableFuture.completedFuture(new ProcessResult(true, processor)) : executorService.submit(() -> {
            try {
                return new ProcessResult(process(list3), processor);
            } catch (Exception e) {
                log.error("Error while call processor's process, will return false and retry", e);
                return new ProcessResult(false, processor);
            }
        });
    }

    protected abstract boolean process(List<ChangedRow> list);

    protected boolean filter(ChangedRow changedRow) {
        return true;
    }

    @Override // com.baijia.databus.Processor
    public int order() {
        return 0;
    }
}
