package com.alibaba.nacos.core.persistence;

import com.alibaba.nacos.api.exception.runtime.NacosRuntimeException;
import com.alibaba.nacos.common.JustForTest;
import com.alibaba.nacos.common.model.RestResult;
import com.alibaba.nacos.common.model.RestResultUtils;
import com.alibaba.nacos.common.notify.Event;
import com.alibaba.nacos.common.notify.NotifyCenter;
import com.alibaba.nacos.common.notify.listener.Subscriber;
import com.alibaba.nacos.common.utils.ExceptionUtil;
import com.alibaba.nacos.common.utils.LoggerUtils;
import com.alibaba.nacos.common.utils.MD5Utils;
import com.alibaba.nacos.common.utils.Preconditions;
import com.alibaba.nacos.common.utils.StringUtils;
import com.alibaba.nacos.consistency.SerializeFactory;
import com.alibaba.nacos.consistency.Serializer;
import com.alibaba.nacos.consistency.cp.CPProtocol;
import com.alibaba.nacos.consistency.cp.RequestProcessor4CP;
import com.alibaba.nacos.consistency.entity.ReadRequest;
import com.alibaba.nacos.consistency.entity.Response;
import com.alibaba.nacos.consistency.entity.WriteRequest;
import com.alibaba.nacos.consistency.exception.ConsistencyException;
import com.alibaba.nacos.consistency.snapshot.SnapshotOperation;
import com.alibaba.nacos.core.cluster.ServerMemberManager;
import com.alibaba.nacos.core.distributed.ProtocolManager;
import com.alibaba.nacos.core.distributed.raft.RaftSysConstants;
import com.alibaba.nacos.core.utils.ClassUtils;
import com.alibaba.nacos.persistence.configuration.condition.ConditionDistributedEmbedStorage;
import com.alibaba.nacos.persistence.datasource.DynamicDataSource;
import com.alibaba.nacos.persistence.datasource.LocalDataSourceServiceImpl;
import com.alibaba.nacos.persistence.exception.NJdbcException;
import com.alibaba.nacos.persistence.model.event.DerbyLoadEvent;
import com.alibaba.nacos.persistence.model.event.RaftDbErrorEvent;
import com.alibaba.nacos.persistence.repository.RowMapperManager;
import com.alibaba.nacos.persistence.repository.embedded.EmbeddedStorageContextHolder;
import com.alibaba.nacos.persistence.repository.embedded.hook.EmbeddedApplyHook;
import com.alibaba.nacos.persistence.repository.embedded.hook.EmbeddedApplyHookHolder;
import com.alibaba.nacos.persistence.repository.embedded.operate.BaseDatabaseOperate;
import com.alibaba.nacos.persistence.repository.embedded.sql.ModifyRequest;
import com.alibaba.nacos.persistence.repository.embedded.sql.SelectRequest;
import com.alibaba.nacos.persistence.repository.embedded.sql.limiter.SqlLimiter;
import com.alibaba.nacos.persistence.repository.embedded.sql.limiter.SqlTypeLimiter;
import com.alibaba.nacos.persistence.utils.PersistenceExecutor;
import com.alibaba.nacos.sys.utils.DiskUtils;
import com.google.protobuf.ByteString;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Conditional;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.DataIntegrityViolationException;
import org.springframework.jdbc.BadSqlGrammarException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
import org.springframework.transaction.support.TransactionTemplate;

@Conditional({ConditionDistributedEmbedStorage.class})
@Component
/* loaded from: input_file:com/alibaba/nacos/core/persistence/DistributedDatabaseOperateImpl.class */
public class DistributedDatabaseOperateImpl extends RequestProcessor4CP implements BaseDatabaseOperate {
    private static final Logger LOGGER = LoggerFactory.getLogger(DistributedDatabaseOperateImpl.class);
    private static final String DATA_IMPORT_KEY = "00--0-data_import-0--00";
    private final ServerMemberManager memberManager;
    private CPProtocol protocol;
    private LocalDataSourceServiceImpl dataSourceService;
    private JdbcTemplate jdbcTemplate;
    private TransactionTemplate transactionTemplate;
    private final Serializer serializer = SerializeFactory.getDefault();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final ReentrantReadWriteLock.ReadLock readLock = this.lock.readLock();
    private final ReentrantReadWriteLock.WriteLock writeLock = this.lock.writeLock();
    private final SqlLimiter sqlLimiter;

    public DistributedDatabaseOperateImpl(ServerMemberManager serverMemberManager, ProtocolManager protocolManager) throws Exception {
        this.memberManager = serverMemberManager;
        this.protocol = protocolManager.getCpProtocol();
        init();
        this.sqlLimiter = new SqlTypeLimiter();
    }

    protected void init() throws Exception {
        this.dataSourceService = DynamicDataSource.getInstance().getDataSource();
        this.dataSourceService.cleanAndReopenDerby();
        this.jdbcTemplate = this.dataSourceService.getJdbcTemplate();
        this.transactionTemplate = this.dataSourceService.getTransactionTemplate();
        NotifyCenter.registerToSharePublisher(RaftDbErrorEvent.class);
        NotifyCenter.registerToSharePublisher(DerbyLoadEvent.class);
        NotifyCenter.registerSubscriber(new Subscriber<RaftDbErrorEvent>() { // from class: com.alibaba.nacos.core.persistence.DistributedDatabaseOperateImpl.1
            public void onEvent(RaftDbErrorEvent raftDbErrorEvent) {
                DistributedDatabaseOperateImpl.this.dataSourceService.setHealthStatus("DOWN");
            }

            public Class<? extends Event> subscribeType() {
                return RaftDbErrorEvent.class;
            }
        });
        this.protocol.addRequestProcessors(Collections.singletonList(this));
        LOGGER.info("use DistributedTransactionServicesImpl");
    }

    @JustForTest
    public void mockConsistencyProtocol(CPProtocol cPProtocol) {
        this.protocol = cPProtocol;
    }

    public <R> R queryOne(String str, Class<R> cls) {
        try {
            LoggerUtils.printIfDebugEnabled(LOGGER, "queryOne info : sql : {}", new Object[]{str});
            Response innerRead = innerRead(ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(this.serializer.serialize(SelectRequest.builder().queryType((byte) 1).sql(str).className(cls.getCanonicalName()).build()))).build(), EmbeddedStorageContextHolder.containsExtendInfo("00--0-read-join-0--00"));
            if (innerRead.getSuccess()) {
                return (R) this.serializer.deserialize(innerRead.getData().toByteArray(), cls);
            }
            throw new NJdbcException(innerRead.getErrMsg(), innerRead.getErrMsg());
        } catch (Exception e) {
            LOGGER.error("An exception occurred during the query operation : {}", e.toString());
            throw new NacosRuntimeException(500, e.toString());
        }
    }

    public <R> R queryOne(String str, Object[] objArr, Class<R> cls) {
        try {
            LoggerUtils.printIfDebugEnabled(LOGGER, "queryOne info : sql : {}, args : {}", new Object[]{str, objArr});
            Response innerRead = innerRead(ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(this.serializer.serialize(SelectRequest.builder().queryType((byte) 2).sql(str).args(objArr).className(cls.getCanonicalName()).build()))).build(), EmbeddedStorageContextHolder.containsExtendInfo("00--0-read-join-0--00"));
            if (innerRead.getSuccess()) {
                return (R) this.serializer.deserialize(innerRead.getData().toByteArray(), cls);
            }
            throw new NJdbcException(innerRead.getErrMsg(), innerRead.getErrMsg());
        } catch (Exception e) {
            LOGGER.error("An exception occurred during the query operation : {}", e.toString());
            throw new NacosRuntimeException(500, e.toString());
        }
    }

    public <R> R queryOne(String str, Object[] objArr, RowMapper<R> rowMapper) {
        try {
            LoggerUtils.printIfDebugEnabled(LOGGER, "queryOne info : sql : {}, args : {}", new Object[]{str, objArr});
            Response innerRead = innerRead(ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(this.serializer.serialize(SelectRequest.builder().queryType((byte) 0).sql(str).args(objArr).className(rowMapper.getClass().getCanonicalName()).build()))).build(), EmbeddedStorageContextHolder.containsExtendInfo("00--0-read-join-0--00"));
            if (innerRead.getSuccess()) {
                return (R) this.serializer.deserialize(innerRead.getData().toByteArray(), ClassUtils.resolveGenericTypeByInterface(rowMapper.getClass()));
            }
            throw new NJdbcException(innerRead.getErrMsg(), innerRead.getErrMsg());
        } catch (Exception e) {
            LOGGER.error("An exception occurred during the query operation : {}", e.toString());
            throw new NacosRuntimeException(500, e.toString());
        }
    }

    public <R> List<R> queryMany(String str, Object[] objArr, RowMapper<R> rowMapper) {
        try {
            LoggerUtils.printIfDebugEnabled(LOGGER, "queryMany info : sql : {}, args : {}", new Object[]{str, objArr});
            Response innerRead = innerRead(ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(this.serializer.serialize(SelectRequest.builder().queryType((byte) 3).sql(str).args(objArr).className(rowMapper.getClass().getCanonicalName()).build()))).build(), EmbeddedStorageContextHolder.containsExtendInfo("00--0-read-join-0--00"));
            if (innerRead.getSuccess()) {
                return (List) this.serializer.deserialize(innerRead.getData().toByteArray(), List.class);
            }
            throw new NJdbcException(innerRead.getErrMsg());
        } catch (Exception e) {
            LOGGER.error("An exception occurred during the query operation : {}", e.toString());
            throw new NacosRuntimeException(500, e.toString());
        }
    }

    public <R> List<R> queryMany(String str, Object[] objArr, Class<R> cls) {
        try {
            LoggerUtils.printIfDebugEnabled(LOGGER, "queryMany info : sql : {}, args : {}", new Object[]{str, objArr});
            Response innerRead = innerRead(ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(this.serializer.serialize(SelectRequest.builder().queryType((byte) 5).sql(str).args(objArr).className(cls.getCanonicalName()).build()))).build(), EmbeddedStorageContextHolder.containsExtendInfo("00--0-read-join-0--00"));
            if (innerRead.getSuccess()) {
                return (List) this.serializer.deserialize(innerRead.getData().toByteArray(), List.class);
            }
            throw new NJdbcException(innerRead.getErrMsg());
        } catch (Exception e) {
            LOGGER.error("An exception occurred during the query operation : {}", e.toString());
            throw new NacosRuntimeException(500, e.toString());
        }
    }

    public List<Map<String, Object>> queryMany(String str, Object[] objArr) {
        try {
            LoggerUtils.printIfDebugEnabled(LOGGER, "queryMany info : sql : {}, args : {}", new Object[]{str, objArr});
            Response innerRead = innerRead(ReadRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(this.serializer.serialize(SelectRequest.builder().queryType((byte) 4).sql(str).args(objArr).build()))).build(), EmbeddedStorageContextHolder.containsExtendInfo("00--0-read-join-0--00"));
            if (innerRead.getSuccess()) {
                return (List) this.serializer.deserialize(innerRead.getData().toByteArray(), List.class);
            }
            throw new NJdbcException(innerRead.getErrMsg());
        } catch (Exception e) {
            LOGGER.error("An exception occurred during the query operation : {}", e.toString());
            throw new NacosRuntimeException(500, e.toString());
        }
    }

    private Response innerRead(ReadRequest readRequest, boolean z) throws Exception {
        return z ? (Response) this.protocol.aGetData(readRequest).join() : this.protocol.getData(readRequest);
    }

    public CompletableFuture<RestResult<String>> dataImport(File file) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                DiskUtils.LineIterator lineIterator = DiskUtils.lineIterator(file);
                try {
                    ArrayList arrayList = new ArrayList(RaftSysConstants.DEFAULT_MAX_ELECTION_DELAY_MS);
                    ArrayList arrayList2 = new ArrayList();
                    while (lineIterator.hasNext()) {
                        String next = lineIterator.next();
                        if (StringUtils.isNotBlank(next)) {
                            arrayList.add(next);
                        }
                        if (arrayList.size() == 1000 || !lineIterator.hasNext()) {
                            arrayList2.add(this.protocol.writeAsync(WriteRequest.newBuilder().setGroup(group()).setData(ByteString.copyFrom(this.serializer.serialize((List) arrayList.stream().map(ModifyRequest::new).collect(Collectors.toList())))).putExtendInfo(DATA_IMPORT_KEY, Boolean.TRUE.toString()).build()));
                            arrayList.clear();
                        }
                    }
                    CompletableFuture.allOf((CompletableFuture[]) arrayList2.toArray(new CompletableFuture[0])).join();
                    Iterator it = arrayList2.iterator();
                    while (it.hasNext()) {
                        Response response = (Response) ((CompletableFuture) it.next()).get();
                        if (!response.getSuccess()) {
                            RestResult failed = RestResultUtils.failed(response.getErrMsg());
                            if (lineIterator != null) {
                                lineIterator.close();
                            }
                            return failed;
                        }
                    }
                    RestResult success = RestResultUtils.success();
                    if (lineIterator != null) {
                        lineIterator.close();
                    }
                    return success;
                } finally {
                }
            } catch (Throwable th) {
                LOGGER.error("data import has error :", th);
                return RestResultUtils.failed(th.getMessage());
            }
        });
    }

    public Boolean update(List<ModifyRequest> list, BiConsumer<Boolean, Throwable> biConsumer) {
        try {
            LoggerUtils.printIfDebugEnabled(LOGGER, "modifyRequests info : {}", new Object[]{list});
            WriteRequest build = WriteRequest.newBuilder().setGroup(group()).setKey(System.currentTimeMillis() + "-" + group() + "-" + this.memberManager.getSelf().getAddress() + "-" + MD5Utils.md5Hex(list.toString(), "UTF-8")).setData(ByteString.copyFrom(this.serializer.serialize(list))).putAllExtendInfo(EmbeddedStorageContextHolder.getCurrentExtendInfo()).setType(list.getClass().getCanonicalName()).build();
            if (!Objects.isNull(biConsumer)) {
                this.protocol.writeAsync(build).whenComplete((response, th) -> {
                    String errMsg = Objects.isNull(th) ? response.getErrMsg() : ExceptionUtil.getCause(th).getMessage();
                    biConsumer.accept(Boolean.valueOf(response.getSuccess()), StringUtils.isBlank(errMsg) ? null : new NJdbcException(errMsg));
                });
                return true;
            }
            Response write = this.protocol.write(build);
            if (write.getSuccess()) {
                return true;
            }
            LOGGER.error("execute sql modify operation failed : {}", write.getErrMsg());
            return false;
        } catch (TimeoutException e) {
            LOGGER.error("An timeout exception occurred during the update operation");
            throw new NacosRuntimeException(500, e.toString());
        } catch (Throwable th2) {
            LOGGER.error("An exception occurred during the update operation : {}", th2);
            throw new NacosRuntimeException(500, th2.toString());
        }
    }

    public List<SnapshotOperation> loadSnapshotOperate() {
        return Collections.singletonList(new DerbySnapshotOperation(this.writeLock));
    }

    public Response onRequest(ReadRequest readRequest) {
        Object queryMany;
        this.readLock.lock();
        try {
            try {
                SelectRequest selectRequest = (SelectRequest) this.serializer.deserialize(readRequest.getData().toByteArray(), SelectRequest.class);
                LoggerUtils.printIfDebugEnabled(LOGGER, "getData info : selectRequest : {}", new Object[]{selectRequest});
                this.sqlLimiter.doLimitForSelectRequest(selectRequest);
                RowMapper rowMapper = RowMapperManager.getRowMapper(selectRequest.getClassName());
                switch (selectRequest.getQueryType()) {
                    case 0:
                        queryMany = queryOne(this.jdbcTemplate, selectRequest.getSql(), selectRequest.getArgs(), rowMapper);
                        break;
                    case 1:
                        queryMany = queryOne(this.jdbcTemplate, selectRequest.getSql(), ClassUtils.findClassByName(selectRequest.getClassName()));
                        break;
                    case 2:
                        queryMany = queryOne(this.jdbcTemplate, selectRequest.getSql(), selectRequest.getArgs(), ClassUtils.findClassByName(selectRequest.getClassName()));
                        break;
                    case 3:
                        queryMany = queryMany(this.jdbcTemplate, selectRequest.getSql(), selectRequest.getArgs(), rowMapper);
                        break;
                    case RaftSysConstants.DEFAULT_RAFT_CLI_SERVICE_THREAD_NUM /* 4 */:
                        queryMany = queryMany(this.jdbcTemplate, selectRequest.getSql(), selectRequest.getArgs());
                        break;
                    case 5:
                        queryMany = queryMany(this.jdbcTemplate, selectRequest.getSql(), selectRequest.getArgs(), ClassUtils.findClassByName(selectRequest.getClassName()));
                        break;
                    default:
                        throw new IllegalArgumentException("Unsupported data query categories");
                }
                Response build = Response.newBuilder().setSuccess(true).setData(queryMany == null ? ByteString.EMPTY : ByteString.copyFrom(this.serializer.serialize(queryMany))).build();
                this.readLock.unlock();
                return build;
            } catch (Exception e) {
                LOGGER.error("There was an error querying the data, request : {}, error : {}", (Object) null, e.toString());
                Response build2 = Response.newBuilder().setSuccess(false).setErrMsg(ClassUtils.getSimplaName(e) + ":" + ExceptionUtil.getCause(e).getMessage()).build();
                this.readLock.unlock();
                return build2;
            }
        } catch (Throwable th) {
            this.readLock.unlock();
            throw th;
        }
    }

    public Response onApply(WriteRequest writeRequest) {
        boolean booleanValue;
        LoggerUtils.printIfDebugEnabled(LOGGER, "onApply info : log : {}", new Object[]{writeRequest});
        ByteString data = writeRequest.getData();
        Preconditions.checkArgument(data != null, "Log.getData() must not null");
        ReentrantReadWriteLock.ReadLock readLock = this.readLock;
        readLock.lock();
        try {
            try {
                try {
                    List list = (List) this.serializer.deserialize(data.toByteArray(), List.class);
                    this.sqlLimiter.doLimitForModifyRequest(list);
                    if (writeRequest.containsExtendInfo(DATA_IMPORT_KEY)) {
                        booleanValue = doDataImport(this.jdbcTemplate, list).booleanValue();
                    } else {
                        list.sort(Comparator.comparingInt((v0) -> {
                            return v0.getExecuteNo();
                        }));
                        booleanValue = update(this.transactionTemplate, this.jdbcTemplate, list).booleanValue();
                        PersistenceExecutor.executeEmbeddedDump(() -> {
                            Iterator it = EmbeddedApplyHookHolder.getInstance().getAllHooks().iterator();
                            while (it.hasNext()) {
                                ((EmbeddedApplyHook) it.next()).afterApply(writeRequest);
                            }
                        });
                    }
                    Response build = Response.newBuilder().setSuccess(booleanValue).build();
                    readLock.unlock();
                    return build;
                } catch (BadSqlGrammarException | DataIntegrityViolationException e) {
                    Response build2 = Response.newBuilder().setSuccess(false).setErrMsg(e.toString()).build();
                    readLock.unlock();
                    return build2;
                }
            } catch (Exception e2) {
                LoggerUtils.printIfWarnEnabled(LOGGER, "onApply warn : log : {}", new Object[]{writeRequest, e2});
                Response build3 = Response.newBuilder().setSuccess(false).setErrMsg(e2.toString()).build();
                readLock.unlock();
                return build3;
            } catch (DataAccessException e3) {
                throw new ConsistencyException(e3.toString());
            }
        } catch (Throwable th) {
            readLock.unlock();
            throw th;
        }
    }

    public void onError(Throwable th) {
        NotifyCenter.publishEvent(new RaftDbErrorEvent(th));
    }

    public String group() {
        return "nacos_config";
    }
}
