客户端通过Redis缓存获取知识包

当服务端启用集群的部署方式后,我们可以选择将知识包放到Redis中缓存,客户端需要更新知识包时,去Redis中获取而不是去服务端获取。

服务端需要实现 ClusterPacketCacheAdapter ,在以下方法中,操作Redis中的缓存。

方法 说明
putPacket 第一次和重置缓存时触发
remove 删除知识包缓存时触发
refreshPacket 刷新知识包缓存时触发
removeProject 删除项目时触发
recacheAllPackets 重置全部知识包缓存时触发

实现参考

1、定义缓存操作接口

import com.bstek.urule.runtime.KnowledgePackage;

/**
 * 自定义知识包缓存接口
 */
public interface CustomKnowledgeCacheService {
    public static final String BEAN_ID = "urule.ext.customKnowledgeCache";
    //缓存中放入知识包
    public void putCache(String key ,KnowledgePackage value);
    //删除知识包
    public void removeCache(String key);
    //删除所有知识包
    public Long removeAllCache(String prefixKey);
}

2、知识包Reids缓存实现类

import java.util.Set;
import javax.annotation.Resource;

import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import com.bstek.urule.Utils;
import com.bstek.urule.console.cache.packet.PacketData;
import com.bstek.urule.runtime.KnowledgePackage;

import lombok.extern.slf4j.Slf4j;
@Slf4j
@Component
public class RedisKnowledgeCacheServiceImpl implements CustomKnowledgeCacheService {

    @Resource
    RedisTemplate redisTemplate;

    @Value("${project.urule.packetcache.prefix:packetcache:packetcache_}")
    String packetCachePrefix;

    @Value("${project.urule.packetcache.timestampsuffix:_timestamp}")
    String timestampSuffix;
    /**
     * 在Redis中存入知识包的同时,存入一个时间戳,用来更新知识包时进行比对
     */
    @Override
    public void putCache(String packetKey, KnowledgePackage knowledgePackage) {

        String knowledgePackageTimestamp = String.valueOf(knowledgePackage.getTimestamp());
        String redisKnowledgePackageTimestamp = (String) redisTemplate.opsForValue().get(packetCachePrefix + packetKey + timestampSuffix);
        if (redisKnowledgePackageTimestamp == null || !knowledgePackageTimestamp.equals(redisKnowledgePackageTimestamp)) {
            String knowledgePackageContent = Utils.knowledgePackageToString(knowledgePackage);
            log.info("set redis id{},timestamp={},redistimestamp={}", packetCachePrefix + packetKey, knowledgePackageTimestamp,redisKnowledgePackageTimestamp);
            redisTemplate.opsForValue().set(packetCachePrefix + packetKey, knowledgePackageContent);
            redisTemplate.opsForValue().set(packetCachePrefix + packetKey + timestampSuffix, knowledgePackageTimestamp);
        }

    }

    @Override
    public void removeCache(String packetKey) {
        redisTemplate.delete(packetCachePrefix + packetKey);
        redisTemplate.delete(packetCachePrefix + packetKey + timestampSuffix);
    }

    @Override
    public Long removeAllCache(String key) {
        Set<String> keys = redisTemplate.keys(packetCachePrefix+key);
        if(!CollectionUtils.isEmpty(keys)) {
            return redisTemplate.delete(keys);
        }

        return null;
    }


}

3、在 ClusterPacketCacheAdapter实现中调用 customKnowledgeCacheService

/**
 * 知识包缓存更新消息通知类
 */
@Slf4j
@Component("urule.clusterPacketCacheAdapter")
public class MsgClusterPacketCacheAdapter implements ClusterPacketCacheAdapter{

    @Autowired
    private CustomKnowledgeCacheService customKnowledgeCacheService;

    @Autowired
    private CustomProducerService customProducerService;

    public List<Map<String, Object>> recacheAllPackets(String groupId) {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
        log.info("recacheAllPackets(String groupId):"+groupId);
        customKnowledgeCacheService.removeAllCache("*");

        ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
        msg.put("groupId", groupId);
        msg.put("systemId", Utils.SystemId);
        msg.put("messageType", MQConstant.QUEUE_CLUSTER_PACKET_REFRESHALL);
        customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, msg.toString());
        return result;
    }
    @Override
    public List<Map<String, Object>> refreshPacket(String groupId, long packetId) {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
        Packet packet = PacketManager.ins.load(packetId);
        String packetCode = packet.getCode();
        PacketData packetData =PacketCache.ins.getPacket(packetId);
        //缓存知识包
        if(packetData!=null) {
            KnowledgePackage knowledgePackage = packetData.getKnowledgePackageWrapper().getKnowledgePackage();
            customKnowledgeCacheService.putCache(String.valueOf(packetId), knowledgePackage);
            customKnowledgeCacheService.putCache(packetCode, knowledgePackage);
        }
        //通知服务端集群节点

        log.info("refreshPacket(String groupId, long packetId):{}:{}:{}",groupId,packetId,packetCode);
        ObjectNode clustermsg = JsonUtils.getObjectJsonMapper().createObjectNode();
        clustermsg.put("groupId", groupId);
        clustermsg.put("systemId", Utils.SystemId);
        clustermsg.put("packetId", String.valueOf(packetId));
        clustermsg.put("packetCode", packetCode);
        clustermsg.put("messageType", MQConstant.QUEUE_CLUSTER_PACKET_REFRESH);
        customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, clustermsg.toString());

        ObjectNode clientmsg = JsonUtils.getObjectJsonMapper().createObjectNode();
        clientmsg.put("groupId", groupId);
        clientmsg.put("systemId", Utils.SystemId);
        clientmsg.put("packetId", String.valueOf(packetId));
        clientmsg.put("packetCode", packetCode);
        clientmsg.put("messageType", MQConstant.QUEUE_CLIENT_PACKET_REFRESH);
        customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, clientmsg.toString());

        return result;
    }
    @Override
    public List<Map<String, Object>> removeProject(String groupId, long projectId, List<PacketConfig> list) {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();

        log.info("removeProject(String paramString, long paramLong, List<PacketConfig> paramList):"+projectId);
        for(PacketConfig pc:list) {
            disableClientsPacket(groupId,pc.getId(),pc.getCode());
            customKnowledgeCacheService.removeCache(String.valueOf(pc.getId()));
            customKnowledgeCacheService.removeCache(pc.getCode());
        }  
        ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
        msg.put("groupId", groupId);
        msg.put("systemId", Utils.SystemId);
        msg.put("projectId", String.valueOf(projectId));
        msg.put("messageType", MQConstant.QUEUE_CLUSTER_PROJECT_REMOVE);
        customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, msg.toString());
        return result;
    }


    @Override
    public void putPacket(long packetId, PacketData paramPacketData) {
        // TODO Auto-generated method stub
        log.info("putPacket(long packetId){}", packetId);
        if (paramPacketData == null) {
            throw new RuleException("Put Package to Redis [id=" + packetId + "] not exist");
        }
        KnowledgePackage knowledgePackage = paramPacketData.getKnowledgePackageWrapper().getKnowledgePackage();
        customKnowledgeCacheService.putCache(String.valueOf(packetId), knowledgePackage);
    }

    @Override
    public void putPacket(String packetCode, PacketData paramPacketData) {
        // TODO Auto-generated method stub
        log.info("putPacket(String packetCode):{}", packetCode);
        if (paramPacketData == null) {
            throw new RuleException("Put Package to Redis [code=" + packetCode + "] not exist");
        }
        KnowledgePackage knowledgePackage = paramPacketData.getKnowledgePackageWrapper().getKnowledgePackage();

        customKnowledgeCacheService.putCache(packetCode, knowledgePackage);

    }

    @Override
    public void remove(long packetId) {
        // TODO Auto-generated method stub
        log.info("remove(String packetId):{}", packetId);
        customKnowledgeCacheService.removeCache(String.valueOf(packetId));
    }

    @Override
    public void remove(String packetCode) {
        // TODO Auto-generated method stub
        log.info("remove(String packetCode):{}", packetCode);
        customKnowledgeCacheService.removeCache(packetCode);
    }

    public List<Map<String, Object>> disableClientsPacket(String groupId, long packetId,String packetCode) {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
        log.info("disableClientsPacket(String groupId:{}, long packetId):{},code:{}" ,groupId,packetId,packetCode);
        ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
        msg.put("groupId", groupId);
        msg.put("systemId", Utils.SystemId);
        msg.put("packetId", String.valueOf(packetId));
        msg.put("packetCode", packetCode);
        msg.put("messageType", MQConstant.QUEUE_CLIENT_PACKET_DISABLE);
        customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, msg.toString());
        return result;
    }


}

4、客户端配置从Redis中获取知识包

results matching ""

    No results matching ""