消息(ActiveMQ / RocketMQ)
xly 中不是所有集成都适合同步 HTTP 调用。框架运行两个消息代理,它们职责不同:
| Broker | 用途 | Producer | Consumer |
|---|---|---|---|
| ActiveMQ / JMS | 集群内扇出事件:基础数据合并作业(把每租户行合并成扁平查询表)以及单据更新 / 单据删除通知。尽管其中一个队列有历史命名,这条通道不用于 Redis 缓存失效;真实缓存清理路径见元数据变更后的缓存失效。 | xlyErpJmsProductor |
xlyErpJmsConsumer |
| RocketMQ | 其他不适合 ActiveMQ 假设的集成流程。 |
RocketMQServiceImpl(位于 xlyBusinessService) |
因服务而异。 |
本页只是指针而不是深入说明;准确的队列名和载荷在 xlyErpJmsConsumer/src/main/java/com/xly/xlyerpjmsconsumer/ 下按 consumer thread 级别记录。
ActiveMQ / JMS:基础数据合并 + 扇出通道
Producer 侧队列声明位于 xlyErpJmsProductor/src/main/java/com/xly/xlyerpjmsproductor/config/P2pQueue.java。完整集合是 24 个 destination,可按意图分组:
模块 / 控制(2)
| Constant | 用途 |
|---|---|
ERP_JMS_ACTIVEMQ_CHANGE_GDS_MODULE |
“模块元数据已变更”,ConsumerChangeGdsModuleThread 会运行存储过程 PRO_ERPMERGEBASEGDSMODULE,把每租户 gdsmodule 行合并到扁平基础查询表中。尽管名字如此,它不清 Redis 缓存;Redis 缓存清理由 BACK 保存时通过 @CacheEvict 同步完成。见元数据变更后的缓存失效。 |
ERP_JMS_ACTIVEMQ_CHANGE_WORK_ORDER_CONTROL |
工单控制状态变更(状态 / 汇总标志),用于下游重算扇出。 |
单据操作(6)
| Constant | 用途 |
|---|---|
ERP_JMS_ACTIVEMQ_UPD_SALE_ORDER / _UPD_WORK_ORDER / _UPD_PRODUCTION_REPORT
|
“单据已更新”通知,由后台消费者处理(合计重算、下游失效等)。 |
ERP_JMS_ACTIVEMQ_DEL_SALE_ORDER / _DEL_WORK_ORDER / _DEL_PRODUCTION_REPORT
|
单据删除通知。 |
主数据变更扇出(7):CHANGE_ELE_*
| Constant | 用途 |
|---|---|
_CHANGE_ELE_CUSTOMER |
客户主数据变更。 |
_CHANGE_ELE_EMPLOYEE |
员工主数据变更。 |
_CHANGE_ELE_MACHINE |
车间机台主数据变更。 |
_CHANGE_ELE_MATERIALS |
材料主数据变更。 |
_CHANGE_ELE_PRODUCT |
产品主数据变更。 |
_CHANGE_ELE_PROCESS |
工序主数据变更。 |
_CHANGE_ELE_TEAM |
班组主数据变更。 |
系统信息 / 字典表变更扇出(9):CHANGE_SIS_*
| Constant | 用途 |
|---|---|
_CHANGE_SIS_CUSTOMER_CLASSIFY |
客户分类变更。 |
_CHANGE_SIS_DELIVER |
送货方式变更。 |
_CHANGE_SIS_FORMULA |
计算公式变更。 |
_CHANGE_SIS_PAYMENT |
付款方式变更。 |
_CHANGE_SIS_PROCESS_CLASSIFY |
工序分类变更。 |
_CHANGE_SIS_PRODUCT_CLASSIFY |
产品分类变更。 |
_CHANGE_SIS_SALES_MAN |
销售人员变更。 |
_CHANGE_SIS_TAX |
税率变更。 |
_CHANGE_SIS_WORK_CENTER |
工作中心变更。 |
(第一张表之后省略 constant 前缀为 _…;完整字面量是 ERP_JMS_ACTIVEMQ_…。)
Listener 侧
xlyErpJmsConsumer/.../consumer/Consumer.java 是承载全部 24 个 @JmsListener 方法的单一类,每个 destination 一个方法。每个方法把载荷分发给 xlyErpJmsConsumer/.../thread/ 下对应的 Consumer*Thread 类,由它异步执行领域工作(通常是调用一个 PRO_ERPMERGEBASE* 存储过程,把每租户行合并成扁平基础查询表)。这里是一个 listener 类有 24 个方法,不是 24 个 listener 类。consumer thread 中没有调用 @CacheEvict 或 cleanRedis*;Redis 缓存失效在 BACK 保存时同步完成,见 cache-invalidation.md。
RocketMQ:其他流程
RocketMQServiceImpl 及其配套 RocketMQService 接口位于 xlyBusinessService/src/main/java/com/xly/service/。它们覆盖非缓存失效流程(例如 MQCompensateServiceImpl 处理 ActiveMQ destination 难以表达的补偿 / 重试语义)。RocketMQ topic 按环境配置。
手动触发缓存失效
如果元数据变更是通过原始 SQL 完成的(没有 BACK 保存路径),各节点缓存不会自动清理。支持的覆盖路径是 xlyBusinessService/.../service/impl/ 中的 BusinessCleanRedisDataImpl,它可以直接调用清理方法。更完整的排查路径见元数据变更后的缓存失效。
这个机制不是什么
- 不是公开集成通道。 外部集成方不应向这些 broker 发布消息,也不应订阅它们。它们是集群内部的扇出机制。
-
不是缓存失效通道。
xlyEntry的 HTTP 写路径会在需要时同步触发@CacheEvict;JMS 队列负责基础数据合并和异步工作项。