/*
 * Decompiled with CFR 0.152.
 */
package cn.kinyun.crm.canal.handler;

import cn.kinyun.crm.canal.CanalFetchHandlerContainer;
import cn.kinyun.crm.canal.CanalMessage;
import cn.kinyun.crm.canal.annotation.CanalDataFetchHandle;
import cn.kinyun.crm.canal.entity.BaseEntityId;
import cn.kinyun.crm.canal.handler.CanalFetchHandler;
import cn.kinyun.crm.common.enums.YnEnum;
import cn.kinyun.crm.dal.canal.entity.CanalFetchData;
import cn.kinyun.crm.dal.canal.mapper.CanalFetchDataMapper;
import cn.kinyun.crm.dal.util.BizTableContext;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.TypeReference;
import com.alibaba.fastjson.parser.Feature;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.kuaike.scrm.common.service.ScrmBizService;
import com.kuaike.scrm.common.service.dto.resp.BizSimpleDto;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Resource;
import org.apache.commons.collections4.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

public abstract class AbstractCanalFetchHandler<T extends Serializable>
implements CanalFetchHandler<T>,
InitializingBean {
    private static final Logger log = LoggerFactory.getLogger(AbstractCanalFetchHandler.class);
    private static final String BIZID_FIELD = "bizId";
    @Resource
    private CanalFetchDataMapper canalFetchDataMapper;
    @Autowired
    private ScrmBizService scrmBizService;

    public void afterPropertiesSet() throws Exception {
        Class<?> aClass = this.getClass();
        CanalDataFetchHandle annotation = aClass.getAnnotation(CanalDataFetchHandle.class);
        if (Objects.nonNull(annotation)) {
            for (String tableName : annotation.tableName()) {
                CanalFetchHandlerContainer.putByTableName(tableName, this);
            }
        }
    }

    public Long getBizId(CanalMessage<T> canalMessage) {
        List<T> messageData = canalMessage.getData();
        if (CollectionUtils.isEmpty(messageData)) {
            return null;
        }
        Serializable entity = (Serializable)messageData.get(0);
        Class<?> entityClass = entity.getClass();
        try {
            Field field = entityClass.getDeclaredField(BIZID_FIELD);
            field.setAccessible(true);
            Object bizId = field.get(entity);
            log.info("canal fetch data bizId:{}", bizId);
            if (bizId instanceof Long) {
                return (Long)bizId;
            }
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void handle(String msg) {
        ParameterizedType sup = (ParameterizedType)this.getClass().getGenericSuperclass();
        Type[] actualTypeArguments = sup.getActualTypeArguments();
        if (Objects.isNull(actualTypeArguments)) {
            log.error("\u672a\u6307\u5b9a\u6cdb\u578b\u7c7b\u578b\uff0c\u65e0\u6cd5\u53cd\u5e8f\u5217\u5316");
            return;
        }
        Type actualTypeArgument = actualTypeArguments[0];
        CanalMessage canalMessage = (CanalMessage)JSON.parseObject((String)msg, (TypeReference)new TypeReference<CanalMessage<T>>(new Type[]{actualTypeArgument}){}, (Feature[])new Feature[0]);
        CanalEntry.EventType type = canalMessage.getType();
        Long bizId = this.getBizId(canalMessage);
        if (Objects.isNull(bizId)) {
            log.error("\u6d88\u8d39canal \u6d88\u606f\u672a\u627e\u5230\u5bf9\u5e94\u6570\u636e\u7684bigId\uff1amsg:{}", canalMessage.getData());
            return;
        }
        BizTableContext.putBizId((Long)bizId);
        BizSimpleDto bizInfo = this.scrmBizService.getById(bizId);
        if (Objects.isNull(bizInfo) || YnEnum.NO.getValue().equals(bizInfo.getIsOpenCrm())) {
            log.error("biz\u4fe1\u606f\u4e3a\u7a7a\uff0c\u6216\u8005\u6ca1\u6709\u5f00\u901acrm,\u65e0\u6cd5\u540c\u6b65\u4fe1\u606f bizId:{}", (Object)bizId);
            return;
        }
        List canalFetchDatas = this.canalFetchDataMapper.queryByDatabaseName(canalMessage.getDatabase(), canalMessage.getTable(), canalMessage.getId(), canalMessage.getEs(), Integer.valueOf(type.getNumber()));
        if (CollectionUtils.isNotEmpty((Collection)canalFetchDatas)) {
            for (CanalFetchData canalFetchData : canalFetchDatas) {
                List jsonObjects = JSON.parseArray((String)canalFetchData.getBizData(), JSONObject.class);
                if (!CollectionUtils.isNotEmpty((Collection)jsonObjects)) continue;
                List existData = jsonObjects.stream().filter(item -> Objects.nonNull(item.getLong("id"))).map(item -> item.getLong("id")).collect(Collectors.toList());
                List data = canalMessage.getData();
                for (Serializable datum : data) {
                    Long id;
                    if (!(datum instanceof BaseEntityId) || !existData.contains(id = ((BaseEntityId)((Object)datum)).getId())) continue;
                    log.error("canal \u91cd\u590d\u6d88\u8d39\u6570\u636e\uff0c\u5f53\u524d\u6570\u636e\u5df2\u540c\u6b65,:msg:{}", canalMessage.getData());
                    return;
                }
            }
        }
        log.info("canal type:{}", (Object)type);
        switch (type) {
            case DELETE: {
                this.delete(canalMessage);
                break;
            }
            case INSERT: {
                this.insert(canalMessage);
                break;
            }
            case UPDATE: {
                this.update(canalMessage);
                break;
            }
        }
        this.canalFetchDataMapper.insert((Object)this.buildCanalFetchData(canalMessage));
        BizTableContext.clear();
    }

    private CanalFetchData buildCanalFetchData(CanalMessage<T> message) {
        CanalFetchData canalFetchData = new CanalFetchData();
        canalFetchData.setSqlStatement(message.getSql());
        canalFetchData.setCanalId(message.getId());
        canalFetchData.setTableName(message.getTable());
        canalFetchData.setDatabaseName(message.getDatabase());
        canalFetchData.setEs(message.getEs());
        canalFetchData.setTs(message.getTs());
        canalFetchData.setIsDdl(Integer.valueOf(message.getIsDdl() != false ? 1 : 0));
        canalFetchData.setType(Integer.valueOf(message.getType().getNumber()));
        canalFetchData.setBizData(JSON.toJSONString(message.getData()));
        canalFetchData.setOldData(JSON.toJSONString(message.getOld()));
        canalFetchData.setPkNames(JSON.toJSONString(message.getPkNames()));
        canalFetchData.setSqlType(JSON.toJSONString(message.getSqlType()));
        canalFetchData.setMysqlType(JSON.toJSONString(message.getMysqlType()));
        canalFetchData.setCreateTime(new Date());
        canalFetchData.setUpdateTime(new Date());
        return canalFetchData;
    }
}

