服务端集群节点之间缓存更新通知

如果您部署的服务端集群节点服务之间可以通过确定的URL地址访问,那么您可以通过3.8.集群管理进行集群节点缓存更新的管理。

如果集群节点服务之间无法通过确定的URL地址访问,比如每次节点重启IP都不确定,或者节点个数可能随时随地发生变化,那么可以参考如下实现,通过消息中间件通知的机制实现每个节点的缓存更新。

img

​ 这种方式无需在URule Pro的集群管理中配置每个节点URL地址,需要实现 ClusterPacketCacheAdapterJarCacheAdapter两个URule更新缓存的接口的方法生产消息:

package com.bstek.urule.console.cache.packet;

import java.util.List;
import java.util.Map;
//服务端集群节点知识包缓存
public interface ClusterPacketCacheAdapter {
  public static final String BEAN_ID = "urule.clusterPacketCacheAdapter";

  void putPacket(long packetId, PacketData paramPacketData);

  void putPacket(String packetCode, PacketData paramPacketData);

  void remove(long packetId);

  void remove(String packetCode);
  //刷新知识包缓存时触发
  List<Map<String, Object>> refreshPacket(String groupId, long packetId);
  //重置全部知识包缓存时触发
  List<Map<String, Object>> recacheAllPackets(String groupId);
  //删除项目时触发
  List<Map<String, Object>> removeProject(String groupId, long projectId, List<PacketConfig> list);
}
public abstract class JarCacheAdapter {
   public static String BEAN_ID = "urule.jarCacheAdapter";
   //更新jar热部署时触发
   public abstract List<Map<String, Object>> loadDynamicJars(String groupId, UrlType urlType) throws Exception;
}

接收到消息的消费类中,通过 com.bstek.urule.console.cache.ServerCacheManager里的方法进行当前节点的缓存更新:

方法 说明
reloadPacket(String systemId, long packetId) 重新加载指定id知识包
syncPacketForRemoveProject(String systemId, long projectId) 删除指定项目的知识包缓存
recacheAllPackets(String systemId) 重新加载所有知识包缓存
reloadDynamicJars(String systemId) 重新加载所有Jar缓存

下面做一个参考实现,前面我们使用Redis实现了Session共享,为了方便我们可以继续使用Redis的消息通知功能,你也可以使用更专业的消息中间件来实现,比如Kafka、RabbitMQ等。

1、定义消息通知的常量类

package com.bstek.urule.sample.mq.constant;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * MQ所需常量
 */
@Component
public class MQConstant {
    public static String QUEUE_CLUSTER_PACKET_REFRESHALL = "urule.cluster.knowledge.refreshall";
    public static String QUEUE_CLUSTER_PACKET_REFRESH = "urule.cluster.knowledge.refresh";
    public static String QUEUE_CLUSTER_PROJECT_REMOVE = "urule.cluster.project.remove";
    public static String QUEUE_CLUSTER_JAR_SYNC = "urule.cluster.jar.sync";

    public static String QUEUE_CLIENT_PACKET_DISABLE = "urule.client.knowledge.disable";
    public static String QUEUE_CLIENT_PACKET_ENABLE = "urule.client.knowledge.enable";
    public static String QUEUE_CLIENT_PACKET_REFRESH = "urule.client.knowledge.refresh";
    public static String QUEUE_CLIENT_JAR_SYNC = "urule.client.jar.sync";

    public static String MQIP;

    @Value("${project.urule.mq.ip:127.0.0.1:9092}")
    public void setMQIP(String mqIP) {
        MQIP = mqIP;
    }

    public static String CLUSTER_TOPIC;

    @Value("${project.urule.mq.clusterTopic:urule-cluster-topic}")
    public void setClusterTopic(String clusterTopic) {
        CLUSTER_TOPIC = clusterTopic;
    }

    public static String CLIENT_TOPIC;

    @Value("${project.urule.mq.clientTopic:urule-client-topic}")
    public void setClientTopic(String clientTopic) {
        CLIENT_TOPIC = clientTopic;
    }
}

2、yaml配置

project:
  urule:
    mq: 
      clusterTopic: urule-cluster-topic
      clientTopic: urule-client-topic

3、配置RedisSessionConfig类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.session.data.redis.config.ConfigureRedisAction;
import org.springframework.session.data.redis.config.annotation.web.http.EnableRedisHttpSession;

@Configuration
//设置session过期时间,默认是1800秒
@EnableRedisHttpSession(maxInactiveIntervalInSeconds = 30 * 60) 
public class RedisSessionConfig {

    @Bean
    public static ConfigureRedisAction configureRedisAction(){
        return ConfigureRedisAction.NO_OP;
    }
    @Autowired
    private RedisConnectionFactory factory;

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setConnectionFactory(factory);
        return redisTemplate;
    }
}

4、配置RedisConsumerConfig消息消费类

package com.bstek.urule.sample.mq.redis.config;

import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;

import com.bstek.urule.sample.mq.constant.MQConstant;
import com.bstek.urule.sample.mq.redis.listener.RedisConsumerListener;

/**
 * Redis消费频道配置
 *
 */
@Component
public class RedisConsumerConfig {

    @Bean
    public MessageListenerAdapter messageListenerAdapter() {
        return new MessageListenerAdapter(new RedisConsumerListener());
    }

    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter messageListenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(messageListenerAdapter, new PatternTopic(MQConstant.CLUSTER_TOPIC));

        return container;
    }


}

5、为了切换消息中间件方便,定义了一个生产消息的接口

package com.bstek.urule.sample.mq;

public interface CustomProducerService {
    public static final String BEAN_ID = "urule.ext.CustomProducerService";
    public void sendMessage(String topic, String message);
}

6、定义生产消息的实现类

package com.bstek.urule.sample.mq.redis.service;

import javax.annotation.Resource;

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import com.bstek.urule.sample.mq.CustomProducerService;
/**
 * Reids消息发送类
 */
@Service
public class RedisProducerServiceImpl implements CustomProducerService{
    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    public void sendMessage(String topic, String message){
        redisTemplate.convertAndSend(topic,message);
    }
}

7、定义服务端某一个集群节点知识包缓存发生变化时,发送消息的通知类

package com.bstek.urule.sample.mq.adapter;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.web.ServerProperties.Tomcat.Threads;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import com.bstek.urule.Utils;
import com.bstek.urule.console.batch.utils.JsonUtils;
import com.bstek.urule.console.cache.packet.ClientPacketCacheAdapter;
import com.bstek.urule.console.cache.packet.ClusterPacketCacheAdapter;
import com.bstek.urule.console.cache.packet.PacketCache;
import com.bstek.urule.console.cache.packet.PacketConfig;
import com.bstek.urule.console.cache.packet.PacketData;
import com.bstek.urule.console.database.manager.packet.PacketManager;
import com.bstek.urule.console.database.model.Packet;
import com.bstek.urule.exception.RuleException;
import com.bstek.urule.runtime.KnowledgePackage;
import com.fasterxml.jackson.databind.node.ObjectNode;

import com.bstek.urule.sample.mq.CustomProducerService;
import com.bstek.urule.sample.mq.constant.MQConstant;
import com.bstek.urule.sample.mq.redis.service.RedisProducerServiceImpl;
import lombok.extern.slf4j.Slf4j;
/**
 * 知识包缓存更新消息通知类
 */
@Slf4j
@Component(ClusterPacketCacheAdapter.BEAN_ID)
public class MsgClusterPacketCacheAdapter implements ClusterPacketCacheAdapter{

    @Autowired
    private CustomProducerService customProducerService;

    public List<Map<String, Object>> recacheAllPackets(String groupId) {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
        log.info("recacheAllPackets(String groupId):"+groupId);
        ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
        msg.put("groupId", groupId);
        msg.put("systemId", Utils.SystemId);
        msg.put("messageType", MQConstant.QUEUE_CLUSTER_PACKET_REFRESHALL);
        customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, msg.toString());
        return result;
    }
    @Override
    public List<Map<String, Object>> refreshPacket(String groupId, long packetId) {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();

//        PacketData packetData = PacketCache.ins.getPacket(packetId);
//        String packetCode =packetData.getPacket().getCode();
        Packet packet = PacketManager.ins.load(packetId);
        String packetCode = packet.getCode();

        log.info("refreshPacket(String groupId, long packetId):{}:{}:{}",groupId,packetId,packetCode);
        ObjectNode clustermsg = JsonUtils.getObjectJsonMapper().createObjectNode();
        clustermsg.put("groupId", groupId);
        clustermsg.put("systemId", Utils.SystemId);
        clustermsg.put("packetId", String.valueOf(packetId));
        clustermsg.put("packetCode", packetCode);
        clustermsg.put("messageType", MQConstant.QUEUE_CLUSTER_PACKET_REFRESH);
        customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, clustermsg.toString());

        /* 如果客户端的缓存更新也采用消息中间件通知更新,可以添加如下代码
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ObjectNode clientmsg = JsonUtils.getObjectJsonMapper().createObjectNode();
        clientmsg.put("groupId", groupId);
        clientmsg.put("systemId", Utils.SystemId);
        clientmsg.put("packetId", String.valueOf(packetId));
        clientmsg.put("packetCode", packetCode);
        clientmsg.put("messageType", MQConstant.QUEUE_CLIENT_PACKET_REFRESH);
        customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, clientmsg.toString());
        */
        return result;
    }
    @Override
    public List<Map<String, Object>> removeProject(String groupId, long projectId, List<PacketConfig> list) {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();

        log.info("removeProject(String paramString, long paramLong, List<PacketConfig> paramList):"+projectId);
        /*如果客户端的缓存更新也采用消息中间件通知更新,可以添加如下代码
        for(PacketConfig pc:list) {
                    disableClientsPacket(groupId,pc.getId(),pc.getCode());
            } */ 
        ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
        msg.put("groupId", groupId);
        msg.put("systemId", Utils.SystemId);
        msg.put("projectId", String.valueOf(projectId));
        msg.put("messageType", MQConstant.QUEUE_CLUSTER_PROJECT_REMOVE);
        customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, msg.toString());
        return result;
    }


    @Override
    public void putPacket(long packetId, PacketData paramPacketData) {
        // TODO Auto-generated method stub
    }

    @Override
    public void putPacket(String packetCode, PacketData paramPacketData) {
        // TODO Auto-generated method stub
    }

    @Override
    public void remove(long packetId) {
        // TODO Auto-generated method stub
    }

    @Override
    public void remove(String packetCode) {
        // TODO Auto-generated method stub
    }
    /*如果客户端的缓存更新也采用消息中间件通知更新,可以添加如下代码
    public List<Map<String, Object>> disableClientsPacket(String groupId, long packetId,String packetCode) {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
        log.info("disableClientsPacket(String groupId:{}, long packetId):{},code:{}" ,groupId,packetId,packetCode);
        ObjectNode msg = JsonUtils.getObjectJsonMapper().createObjectNode();
        msg.put("groupId", groupId);
        msg.put("systemId", Utils.SystemId);
        msg.put("packetId", String.valueOf(packetId));
        msg.put("packetCode", packetCode);
        msg.put("messageType", MQConstant.QUEUE_CLIENT_PACKET_DISABLE);
        customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, msg.toString());
        return result;
    }
*/

}

8、定义服务端某一个集群节点Jar包缓存发生变化时,发送消息的通知类

package com.bstek.urule.sample.mq.adapter;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.common.errors.TimeoutException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.bstek.urule.Utils;
import com.bstek.urule.console.batch.utils.JsonUtils;
import com.bstek.urule.console.database.model.UrlType;
import com.bstek.urule.console.editor.jar.JarCacheAdapter;
import com.fasterxml.jackson.databind.node.ObjectNode;

import com.bstek.urule.sample.mq.CustomProducerService;
import com.bstek.urule.sample.mq.constant.MQConstant;
import com.bstek.urule.sample.mq.redis.service.RedisProducerServiceImpl;
import lombok.extern.slf4j.Slf4j;
/**
 * Jar包缓存消息发送类
 */
@Slf4j
@Component("urule.jarCacheAdapter")
public class MsgJarCacheAdapter extends JarCacheAdapter {

    @Autowired
    private CustomProducerService customProducerService;


    @Override
    public List<Map<String, Object>> loadDynamicJars(String groupId, UrlType urlType) throws Exception {
        List<Map<String,Object>> result=new ArrayList<Map<String,Object>>();
        log.info("loadDynamicJars(String groupId, UrlType urlType)"+groupId);
        ObjectNode clustermsg = JsonUtils.getObjectJsonMapper().createObjectNode();
        clustermsg.put("groupId", groupId);
        clustermsg.put("systemId", Utils.SystemId);
        clustermsg.put("messageType", MQConstant.QUEUE_CLUSTER_JAR_SYNC);
        customProducerService.sendMessage(MQConstant.CLUSTER_TOPIC, clustermsg.toString());
          /*如果客户端的缓存更新也采用消息中间件通知更新,可以添加如下代码
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            ObjectNode clientmsg = JsonUtils.getObjectJsonMapper().createObjectNode();
            clientmsg.put("groupId", groupId);
            clientmsg.put("systemId", Utils.SystemId);
            clientmsg.put("messageType", MQConstant.QUEUE_CLIENT_JAR_SYNC);
            customProducerService.sendMessage(MQConstant.CLIENT_TOPIC, clientmsg.toString());
        */
        return result;
    }

}

9、定义接收到消息的监听类

package com.bstek.urule.sample.mq.redis.listener;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

import com.bstek.urule.Utils;
import com.bstek.urule.console.batch.utils.JsonUtils;
import com.bstek.urule.console.cache.ServerCacheManager;
import com.bstek.urule.runtime.cache.ClientCacheManager;
import com.fasterxml.jackson.core.JsonProcessingException;

import com.bstek.urule.sample.mq.constant.MQConstant;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

import javax.annotation.Resource;
/**
 * Redis消息监听处理类
 */
@Slf4j
public class RedisConsumerListener implements MessageListener {


    private static Map<String, Consumer<String>> RULE = new HashMap<>();

    {
        RULE.put(MQConstant.CLUSTER_TOPIC, this::consumerClusterMessage);
//        RULE.put(MQConstant.CLIENT_TOPIC, this::consumerClusterMessage);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] b_channel = message.getChannel();
        byte[] b_body = message.getBody();
        try {
            String channel = new String(b_channel);
            String body = new String(b_body);
            log.info("channel is:" + channel + " , body is: " + body);
            RULE.get(channel).accept(body);
        } catch (Exception e) {
        }
    }

    public void consumerClusterMessage(String message) {
        ServerCacheManager serverCacheManager = (ServerCacheManager) Utils.getApplicationContext().getBean("urule.serverCacheManager");
        log.info("consumerClusterMessage exec params is :" + message);

        try {
            HashMap<String,String> mapMessage = JsonUtils.getObjectJsonMapper().readValue(message, HashMap.class);
            String messageType = mapMessage.get("messageType");
            String systemId = mapMessage.get("systemId");
            String groupId = mapMessage.get("groupId");

            String packetId = mapMessage.get("packetId"); 
            String projectId = mapMessage.get("projectId");

            if(MQConstant.QUEUE_CLUSTER_PACKET_REFRESH.equals(messageType)) {
                serverCacheManager.reloadPacket(systemId, Long.valueOf(packetId));

            }else if(MQConstant.QUEUE_CLUSTER_PROJECT_REMOVE.equals(messageType)) {
                serverCacheManager.syncPacketForRemoveProject(systemId, Long.valueOf(projectId));

            }else if(MQConstant.QUEUE_CLUSTER_PACKET_REFRESHALL.equals(messageType)) {
                serverCacheManager.recacheAllPackets(systemId);

            }else if(MQConstant.QUEUE_CLUSTER_JAR_SYNC.equals(messageType)) {
                serverCacheManager.reloadDynamicJars(systemId);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    /*如果客户端的消息消费监听类,可以添加如下代码
    public void consumerClinetMessage(String message) {
        ClientCacheManager clientCacheManager = (ClientCacheManager) Utils.getApplicationContext().getBean("urule.clientCacheManager");
        log.info("consumerClinetMessage exec params is :" + message);
        try {
            HashMap<String,String> mapMessage = JsonUtils.getObjectJsonMapper().readValue(message, HashMap.class);
            String messageType = mapMessage.get("messageType");
            String systemId = mapMessage.get("systemId");
            String groupId = mapMessage.get("groupId");

            String packetId = mapMessage.get("packetId"); 
            String projectId = mapMessage.get("projectId");

            if(MQConstant.QUEUE_CLIENT_PACKET_DISABLE.equals(messageType)) {
                clientCacheManager.disableKnowledge(packetId);

            }else if(MQConstant.QUEUE_CLIENT_PACKET_ENABLE.equals(messageType)) {
                clientCacheManager.enableKnowledge(packetId);

            }else if(MQConstant.QUEUE_CLIENT_PACKET_REFRESH.equals(messageType)) {
                clientCacheManager.reloadKnowledge(packetId);

            }else if(MQConstant.QUEUE_CLIENT_JAR_SYNC.equals(messageType)) {
                clientCacheManager.reloadDynamicJars();
                log.info("=====jar重新加载完成=====");
            }

        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
}
*/

至此,通过Redis实现的集群节点之间的缓存通知功能就完成了。

results matching ""

    No results matching ""