diff options
| author | rtkay123 <dev@kanjala.com> | 2026-02-03 13:45:46 +0200 |
|---|---|---|
| committer | rtkay123 <dev@kanjala.com> | 2026-02-03 13:45:46 +0200 |
| commit | eb2e86997d47249aa31b703598de13ab2eb96caa (patch) | |
| tree | 9a591adee7d027b305d07a04987b5559b99f4d37 /src/server/state/cache/mod.rs | |
| parent | 0ea3cb1d4743b922fbc6e07037096e75caffba8f (diff) | |
| download | sellershut-eb2e86997d47249aa31b703598de13ab2eb96caa.tar.bz2 sellershut-eb2e86997d47249aa31b703598de13ab2eb96caa.zip | |
Diffstat (limited to 'src/server/state/cache/mod.rs')
| -rw-r--r-- | src/server/state/cache/mod.rs | 240 |
1 files changed, 240 insertions, 0 deletions
diff --git a/src/server/state/cache/mod.rs b/src/server/state/cache/mod.rs new file mode 100644 index 0000000..09af5f7 --- /dev/null +++ b/src/server/state/cache/mod.rs @@ -0,0 +1,240 @@ +mod cluster; +mod sentinel; + +use anyhow::Result; +use redis::{ + AsyncConnectionConfig, ProtocolVersion, RedisConnectionInfo, RedisError, TlsMode, + aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo, +}; +use std::{fmt::Debug, sync::Arc}; + +use bb8_redis::{ + RedisConnectionManager, + bb8::{self, Pool, RunError}, +}; +use tokio::sync::Mutex; + +use crate::{ + config::cache::{CacheConfig, RedisVariant}, + server::state::cache::{ + cluster::RedisClusterConnectionManager, sentinel::RedisSentinelConnectionManager, + }, +}; + +const REDIS_CONN_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(2); + +#[derive(Clone)] +pub enum RedisManager { + Clustered(Pool<RedisClusterConnectionManager>), + NonClustered(Pool<RedisConnectionManager>), + Sentinel(Pool<RedisSentinelConnectionManager>), + ClusteredUnpooled(redis::cluster_async::ClusterConnection), + NonClusteredUnpooled(redis::aio::ConnectionManager), + SentinelUnpooled(Arc<Mutex<redis::sentinel::SentinelClient>>), +} + +impl Debug for RedisManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Clustered(arg0) => f.debug_tuple("Clustered").field(arg0).finish(), + Self::NonClustered(arg0) => f.debug_tuple("NonClustered").field(arg0).finish(), + Self::Sentinel(arg0) => f.debug_tuple("Sentinel").field(arg0).finish(), + Self::ClusteredUnpooled(_arg0) => f.debug_tuple("ClusteredUnpooled").finish(), + Self::NonClusteredUnpooled(arg0) => { + f.debug_tuple("NonClusteredUnpooled").field(arg0).finish() + } + Self::SentinelUnpooled(_arg0) => f.debug_tuple("SentinelUnpooled").finish(), + } + } +} + +pub enum RedisConnection<'a> { + Clustered(bb8::PooledConnection<'a, RedisClusterConnectionManager>), + NonClustered(bb8::PooledConnection<'a, RedisConnectionManager>), + SentinelPooled(bb8::PooledConnection<'a, RedisSentinelConnectionManager>), + ClusteredUnpooled(redis::cluster_async::ClusterConnection), + NonClusteredUnpooled(redis::aio::ConnectionManager), + SentinelUnpooled(redis::aio::MultiplexedConnection), +} + +impl redis::aio::ConnectionLike for RedisConnection<'_> { + fn req_packed_command<'a>( + &'a mut self, + cmd: &'a redis::Cmd, + ) -> redis::RedisFuture<'a, redis::Value> { + match self { + RedisConnection::Clustered(conn) => conn.req_packed_command(cmd), + RedisConnection::NonClustered(conn) => conn.req_packed_command(cmd), + RedisConnection::ClusteredUnpooled(conn) => conn.req_packed_command(cmd), + RedisConnection::NonClusteredUnpooled(conn) => conn.req_packed_command(cmd), + RedisConnection::SentinelPooled(conn) => conn.req_packed_command(cmd), + RedisConnection::SentinelUnpooled(conn) => conn.req_packed_command(cmd), + } + } + + fn req_packed_commands<'a>( + &'a mut self, + cmd: &'a redis::Pipeline, + offset: usize, + count: usize, + ) -> redis::RedisFuture<'a, Vec<redis::Value>> { + match self { + RedisConnection::Clustered(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::NonClustered(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::ClusteredUnpooled(conn) => { + conn.req_packed_commands(cmd, offset, count) + } + RedisConnection::NonClusteredUnpooled(conn) => { + conn.req_packed_commands(cmd, offset, count) + } + RedisConnection::SentinelPooled(conn) => conn.req_packed_commands(cmd, offset, count), + RedisConnection::SentinelUnpooled(conn) => conn.req_packed_commands(cmd, offset, count), + } + } + + fn get_db(&self) -> i64 { + match self { + RedisConnection::Clustered(conn) => conn.get_db(), + RedisConnection::NonClustered(conn) => conn.get_db(), + RedisConnection::ClusteredUnpooled(conn) => conn.get_db(), + RedisConnection::NonClusteredUnpooled(conn) => conn.get_db(), + RedisConnection::SentinelPooled(conn) => conn.get_db(), + RedisConnection::SentinelUnpooled(conn) => conn.get_db(), + } + } +} + +impl RedisManager { + pub async fn new(config: &CacheConfig) -> Result<Self> { + if config.pooled { + Self::new_pooled( + config.redis_dsn.as_ref(), + &config.kind, + config.max_connections, + ) + .await + } else { + Self::new_unpooled(config.redis_dsn.as_ref(), &config.kind).await + } + } + async fn new_pooled(dsn: &str, variant: &RedisVariant, max_conns: u16) -> Result<Self> { + match variant { + RedisVariant::Clustered => { + let mgr = RedisClusterConnectionManager::new(dsn)?; + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await?; + Ok(RedisManager::Clustered(pool)) + } + RedisVariant::NonClustered => { + let mgr = RedisConnectionManager::new(dsn)?; + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await?; + Ok(RedisManager::NonClustered(pool)) + } + RedisVariant::Sentinel(cfg) => { + let mgr = RedisSentinelConnectionManager::new( + vec![dsn], + cfg.service_name.clone(), + Some(create_config(cfg)), + )?; + let pool = bb8::Pool::builder() + .max_size(max_conns.into()) + .build(mgr) + .await?; + Ok(RedisManager::Sentinel(pool)) + } + } + } + + async fn new_unpooled(dsn: &str, variant: &RedisVariant) -> Result<Self> { + match variant { + RedisVariant::Clustered => { + let cli = redis::cluster::ClusterClient::builder(vec![dsn]) + .retries(1) + .connection_timeout(REDIS_CONN_TIMEOUT) + .build()?; + let con = cli.get_async_connection().await?; + Ok(RedisManager::ClusteredUnpooled(con)) + } + RedisVariant::NonClustered => { + let cli = redis::Client::open(dsn)?; + let con = redis::aio::ConnectionManager::new_with_config( + cli, + ConnectionManagerConfig::new() + .set_number_of_retries(1) + .set_connection_timeout(Some(REDIS_CONN_TIMEOUT)), + ) + .await?; + Ok(RedisManager::NonClusteredUnpooled(con)) + } + RedisVariant::Sentinel(cfg) => { + let cli = redis::sentinel::SentinelClient::build( + vec![dsn], + cfg.service_name.clone(), + Some(create_config(cfg)), + redis::sentinel::SentinelServerType::Master, + )?; + + Ok(RedisManager::SentinelUnpooled(Arc::new(Mutex::new(cli)))) + } + } + } + + pub async fn get(&self) -> Result<RedisConnection<'_>, RunError<RedisError>> { + match self { + Self::Clustered(pool) => Ok(RedisConnection::Clustered(pool.get().await?)), + Self::NonClustered(pool) => Ok(RedisConnection::NonClustered(pool.get().await?)), + Self::Sentinel(pool) => Ok(RedisConnection::SentinelPooled(pool.get().await?)), + Self::ClusteredUnpooled(conn) => Ok(RedisConnection::ClusteredUnpooled(conn.clone())), + Self::NonClusteredUnpooled(conn) => { + Ok(RedisConnection::NonClusteredUnpooled(conn.clone())) + } + Self::SentinelUnpooled(conn) => { + let mut conn = conn.lock().await; + let con = conn + .get_async_connection_with_config( + &AsyncConnectionConfig::new() + .set_response_timeout(Some(REDIS_CONN_TIMEOUT)), + ) + .await?; + Ok(RedisConnection::SentinelUnpooled(con)) + } + } + } +} + +fn create_config(cfg: &crate::config::cache::SentinelConfig) -> SentinelNodeConnectionInfo { + let tls_mode = cfg.redis_tls_mode_secure.then_some(TlsMode::Secure); + let protocol = if cfg.redis_use_resp3 { + ProtocolVersion::RESP3 + } else { + ProtocolVersion::default() + }; + let info = RedisConnectionInfo::default(); + let info = if let Some(pass) = &cfg.redis_password { + info.set_password(pass.clone()) + } else { + info + }; + + let info = if let Some(user) = &cfg.redis_username { + info.set_username(user.clone()) + } else { + info + } + .set_protocol(protocol.clone()) + .set_db(cfg.redis_db.unwrap_or(0)); + + let sent_info = SentinelNodeConnectionInfo::default(); + + if let Some(tls) = tls_mode { + sent_info.set_tls_mode(tls) + } else { + sent_info + } + .set_redis_connection_info(info) +} |
