use bb8_redis::bb8; use redis::{ ErrorKind, IntoConnectionInfo, RedisError, sentinel::{SentinelClient, SentinelNodeConnectionInfo, SentinelServerType}, }; use tokio::sync::Mutex; struct LockedSentinelClient(pub(crate) Mutex); /// ConnectionManager that implements `bb8::ManageConnection` and supports /// asynchronous Sentinel connections via `redis::sentinel::SentinelClient` pub struct RedisSentinelConnectionManager { client: LockedSentinelClient, } impl RedisSentinelConnectionManager { pub fn new( info: Vec, service_name: String, node_connection_info: Option, ) -> Result { Ok(RedisSentinelConnectionManager { client: LockedSentinelClient(Mutex::new(SentinelClient::build( info, service_name, node_connection_info, SentinelServerType::Master, )?)), }) } } impl bb8::ManageConnection for RedisSentinelConnectionManager { type Connection = redis::aio::MultiplexedConnection; type Error = RedisError; async fn connect(&self) -> Result { self.client.0.lock().await.get_async_connection().await } async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { let pong: String = redis::cmd("PING").query_async(conn).await?; match pong.as_str() { "PONG" => Ok(()), _ => Err(( ErrorKind::Server(redis::ServerErrorKind::ResponseError), "ping request", ) .into()), } } fn has_broken(&self, _: &mut Self::Connection) -> bool { false } }