使用RabbitMQ实现接口补偿
业务背景
在我们的日常开发中,经常需要调用第三方接口来进行数据传递,在调用接口的过程中,会因为各种原因导致调用的失败。这时我们希望能有一种机制实现对失败的接口的重复调用,并且能够实现人工干预。
实现思路
1、当接口调用失败,记录相关数据到数据库,采用轮询的方式对数据库的记录进行扫描
2、接口调用失败时,记录相关数据到数据库,同时发送消息到 RabbitMQ ,利用 RabbitMQ 的 TTL(Time To Live) 和 DLX(Dead Letter Exchanges) 特性来实现对接口的重复调用
本文采用的方式是第二种,接口调用流程如下图:
RabbitMQ
RabbitMQ 可以通过 TTL(Time To Live)、DLX(Dead Letter Exchanges) 特性实现延迟队列。其原理是给消息设置过期时间,在消息队列上为过期消息指定转发器,这样消息过期后会转发到与指定转发器匹配的队列上,就实现了延时队列。消息流转如下图:
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "oec2003", Password = "123456" }; using (var connection = factory.CreateConnection()) while (Console.ReadLine() != null) { using (var channel = connection.CreateModel()) { var arguments = new Dictionary<string, object>(); arguments.Add("x-dead-letter-exchange", "exchange-2"); arguments.Add("x-dead-letter-routing-key", "rk-2");
channel.QueueDeclare("queue-1",true,false,false,arguments); channel.ExchangeDeclare("exchange-2", "direct"); channel.QueueDeclare("queue-2", false, false, false, null); channel.QueueBind("queue-2", "exchange-2", "rk-2", null);
var message = "Hello oec2003!"; var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties(); properties.Persistent = true; properties.Expiration = "5000";
channel.BasicPublish("", "queue-1", properties, body); Console.WriteLine($"发送: {message}"); } } Console.ReadKey(); }
|
消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "127.0.0.1", UserName = "oec2003", Password = "123456" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare("queue-2", false, false, false, null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine("已接收: {0}", message); }; channel.BasicConsume("queue-2", false, consumer); } Console.ReadLine(); }
|
数据库
在数据库中需要存储接口调用的相关信息,有以下几个用途:
- 记录失败次数,作为消息发送时的依据
- 超过最大的重试次数,需要人工来进行手动重新调用
上面表中只是基础的一些字段,在真实业务中可以根据实际情况进行字段的增加。
最后
本文提供一种很简单的实现接口补偿的方式,希望对您有所帮助,也欢迎私信讨论。
文中示例代码:https://github.com/oec2003/StudySamples/tree/master/RabbitMQDLXDemo