package cn.kinyun.wework.sdk.consumer;

import cn.kinyun.wework.sdk.common.ServiceContext;
import cn.kinyun.wework.sdk.common.components.AbstractTopicConsumer;
import cn.kinyun.wework.sdk.common.utils.JacksonObjectMapper;
import cn.kinyun.wework.sdk.common.utils.JacksonUtil;
import cn.kinyun.wework.sdk.common.utils.KafkaClientUtils;
import cn.kinyun.wework.sdk.dao.entity.WeworkApiLog;
import cn.kinyun.wework.sdk.dao.mapper.WeworkApiLogMapper;
import cn.kinyun.wework.sdk.entity.ErrorCode;
import cn.kinyun.wework.sdk.exception.WeworkException;
import cn.kinyun.wework.sdk.exec.WeworkApiExecService;
import cn.kinyun.wework.sdk.finishLog.service.WeworkApiLogService;
import cn.kinyun.wework.sdk.response.ResponseBody;
import cn.kinyun.wework.sdk.utils.ApplicationContextUtils;
import com.alibaba.dubbo.rpc.RpcInvocation;
import java.lang.reflect.InvocationTargetException;
import java.util.Objects;
import javax.annotation.PostConstruct;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:cn/kinyun/wework/sdk/consumer/PartitionLogConsumer.class */
public class PartitionLogConsumer extends AbstractTopicConsumer {
    private static final Logger log = LoggerFactory.getLogger(PartitionLogConsumer.class);

    @Value("${kafka.topic.partitionApiLog:}")
    private String topic;

    @Value("${kafka.topic.apiResponse:dev_api_response}")
    private String requestBodyTopic;

    @Autowired
    private KafkaClientUtils kafkaClient;

    @Autowired
    private WeworkApiLogMapper weworkApiLogMapper;

    @Autowired
    private WeworkApiExecService execService;

    @Autowired
    private WeworkApiLogService weworkApiLogService;

    @Override // cn.kinyun.wework.sdk.common.components.AbstractTopicConsumer
    public String getTopic() {
        return this.topic;
    }

    @PostConstruct
    public void init() {
    }

    @Override // cn.kinyun.wework.sdk.common.components.AbstractTopicConsumer
    public void handle(ConsumerRecord<String, String> consumerRecord) {
        log.info("partitionLogConsumer handle {}", consumerRecord.value());
        String str = (String) consumerRecord.value();
        WeworkApiLog queryByRequestId = this.weworkApiLogMapper.queryByRequestId(str);
        if (Objects.isNull(queryByRequestId)) {
            log.info(" is null");
        } else {
            this.execService.put(queryByRequestId, () -> {
                long currentTimeMillis = System.currentTimeMillis();
                ServiceContext.getContext().setRequestId(str);
                String function = queryByRequestId.getFunction();
                String substring = function.substring(0, function.lastIndexOf("."));
                try {
                    RpcInvocation rpcInvocation = (RpcInvocation) JacksonObjectMapper.readValue(queryByRequestId.getRequestJson(), RpcInvocation.class);
                    Class[] parameterTypes = rpcInvocation.getParameterTypes();
                    Object[] arguments = rpcInvocation.getArguments();
                    Object invoke = invoke(substring, rpcInvocation, parameterTypes, arguments);
                    log.info("call {} request={} result={}", new Object[]{function, arguments, invoke});
                    this.weworkApiLogService.updateFinishLog(str, currentTimeMillis);
                    push2Kafka(str, function, invoke);
                } catch (Exception e) {
                    log.error("invoke with error, requestId={}", str, e);
                    if (this.weworkApiLogService.saveErrorLog(str, currentTimeMillis, e)) {
                        ErrorCode errorCode = new ErrorCode();
                        errorCode.setErrCode(-1);
                        errorCode.setErrMsg(StringUtils.abbreviate(e.getCause().toString(), 1024));
                        push2Kafka(str, function, errorCode);
                    }
                } catch (WeworkException e2) {
                    log.error("invoke with weworkError, requestId={}", str, e2);
                    if (this.weworkApiLogService.saveErrorLog(str, currentTimeMillis, e2)) {
                        ErrorCode errorCode2 = new ErrorCode();
                        errorCode2.setErrCode(e2.getErrorCode());
                        errorCode2.setErrMsg(e2.getErrorMsg());
                        push2Kafka(str, function, errorCode2);
                    }
                } finally {
                    ServiceContext.removeContext();
                }
            });
        }
    }

    private Object invoke(String str, RpcInvocation rpcInvocation, Class[] clsArr, Object[] objArr) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException {
        Class<?> cls = Class.forName(str);
        return cls.getDeclaredMethod(rpcInvocation.getMethodName(), clsArr).invoke(ApplicationContextUtils.getBean(cls, "Impl"), objArr);
    }

    private void push2Kafka(String str, String str2, Object obj) {
        ResponseBody responseBody = new ResponseBody();
        responseBody.setApi(str2);
        responseBody.setRequestId(str);
        responseBody.setBody(JacksonUtil.obj2Str(obj));
        if (obj instanceof ErrorCode) {
            ErrorCode errorCode = (ErrorCode) obj;
            responseBody.setErrorCode(errorCode.getErrCode());
            responseBody.setErrorMsg(errorCode.getErrMsg());
        } else {
            responseBody.setErrorCode(-1);
        }
        this.kafkaClient.sendMessage(this.requestBodyTopic, str, JacksonUtil.obj2Str(responseBody));
    }
}
