messaging.md 4.91 KB

消息(ActiveMQ / RocketMQ)

xly 中不是所有集成都适合同步 HTTP 调用。框架运行两个消息代理,它们职责不同:

Broker 用途 Producer Consumer
ActiveMQ / JMS 集群内扇出事件:基础数据合并作业(把每租户行合并成扁平查询表)以及单据更新 / 单据删除通知。尽管其中一个队列有历史命名,这条通道不用于 Redis 缓存失效;真实缓存清理路径见元数据变更后的缓存失效 xlyErpJmsProductor xlyErpJmsConsumer
RocketMQ 其他不适合 ActiveMQ 假设的集成流程。 RocketMQServiceImpl(位于 xlyBusinessService 因服务而异。

本页只是指针而不是深入说明;准确的队列名和载荷在 xlyErpJmsConsumer/src/main/java/com/xly/xlyerpjmsconsumer/ 下按 consumer thread 级别记录。

ActiveMQ / JMS:基础数据合并 + 扇出通道

Producer 侧队列声明位于 xlyErpJmsProductor/src/main/java/com/xly/xlyerpjmsproductor/config/P2pQueue.java。完整集合是 24 个 destination,可按意图分组:

模块 / 控制(2)

Constant 用途
ERP_JMS_ACTIVEMQ_CHANGE_GDS_MODULE “模块元数据已变更”,ConsumerChangeGdsModuleThread 会运行存储过程 PRO_ERPMERGEBASEGDSMODULE,把每租户 gdsmodule 行合并到扁平基础查询表中。尽管名字如此,它不清 Redis 缓存;Redis 缓存清理由 BACK 保存时通过 @CacheEvict 同步完成。见元数据变更后的缓存失效
ERP_JMS_ACTIVEMQ_CHANGE_WORK_ORDER_CONTROL 工单控制状态变更(状态 / 汇总标志),用于下游重算扇出。

单据操作(6)

Constant 用途
ERP_JMS_ACTIVEMQ_UPD_SALE_ORDER / _UPD_WORK_ORDER / _UPD_PRODUCTION_REPORT “单据已更新”通知,由后台消费者处理(合计重算、下游失效等)。
ERP_JMS_ACTIVEMQ_DEL_SALE_ORDER / _DEL_WORK_ORDER / _DEL_PRODUCTION_REPORT 单据删除通知。

主数据变更扇出(7):CHANGE_ELE_*

Constant 用途
_CHANGE_ELE_CUSTOMER 客户主数据变更。
_CHANGE_ELE_EMPLOYEE 员工主数据变更。
_CHANGE_ELE_MACHINE 车间机台主数据变更。
_CHANGE_ELE_MATERIALS 材料主数据变更。
_CHANGE_ELE_PRODUCT 产品主数据变更。
_CHANGE_ELE_PROCESS 工序主数据变更。
_CHANGE_ELE_TEAM 班组主数据变更。

系统信息 / 字典表变更扇出(9):CHANGE_SIS_*

Constant 用途
_CHANGE_SIS_CUSTOMER_CLASSIFY 客户分类变更。
_CHANGE_SIS_DELIVER 送货方式变更。
_CHANGE_SIS_FORMULA 计算公式变更。
_CHANGE_SIS_PAYMENT 付款方式变更。
_CHANGE_SIS_PROCESS_CLASSIFY 工序分类变更。
_CHANGE_SIS_PRODUCT_CLASSIFY 产品分类变更。
_CHANGE_SIS_SALES_MAN 销售人员变更。
_CHANGE_SIS_TAX 税率变更。
_CHANGE_SIS_WORK_CENTER 工作中心变更。

(第一张表之后省略 constant 前缀为 _…;完整字面量是 ERP_JMS_ACTIVEMQ_…。)

Listener 侧

xlyErpJmsConsumer/.../consumer/Consumer.java 是承载全部 24 个 @JmsListener 方法的单一类,每个 destination 一个方法。每个方法把载荷分发给 xlyErpJmsConsumer/.../thread/ 下对应的 Consumer*Thread 类,由它异步执行领域工作(通常是调用一个 PRO_ERPMERGEBASE* 存储过程,把每租户行合并成扁平基础查询表)。这里是一个 listener 类有 24 个方法,不是 24 个 listener 类。consumer thread 中没有调用 @CacheEvictcleanRedis*;Redis 缓存失效在 BACK 保存时同步完成,见 cache-invalidation.md

RocketMQ:其他流程

RocketMQServiceImpl 及其配套 RocketMQService 接口位于 xlyBusinessService/src/main/java/com/xly/service/。它们覆盖非缓存失效流程(例如 MQCompensateServiceImpl 处理 ActiveMQ destination 难以表达的补偿 / 重试语义)。RocketMQ topic 按环境配置。

手动触发缓存失效

如果元数据变更是通过原始 SQL 完成的(没有 BACK 保存路径),各节点缓存不会自动清理。支持的覆盖路径是 xlyBusinessService/.../service/impl/ 中的 BusinessCleanRedisDataImpl,它可以直接调用清理方法。更完整的排查路径见元数据变更后的缓存失效

这个机制不是什么

  • 不是公开集成通道。 外部集成方不应向这些 broker 发布消息,也不应订阅它们。它们是集群内部的扇出机制。
  • 不是缓存失效通道。 xlyEntry 的 HTTP 写路径会在需要时同步触发 @CacheEvict;JMS 队列负责基础数据合并和异步工作项。