aboutsummaryrefslogtreecommitdiffstats
path: root/crates/sh-util/src/cache/mod.rs
diff options
context:
space:
mode:
authorrtkay123 <dev@kanjala.com>2026-04-05 15:17:55 +0200
committerrtkay123 <dev@kanjala.com>2026-04-05 15:17:55 +0200
commit3f708c5fffed105b27965f8e844a26de6bdf9662 (patch)
treefbed157ae7fc15a26a86fba5e0b8b9c5107ee07f /crates/sh-util/src/cache/mod.rs
parente86366c6d68b9d3d2af4ac4afb5cf7d5a8400dde (diff)
downloadsellershut-3f708c5fffed105b27965f8e844a26de6bdf9662.tar.bz2
sellershut-3f708c5fffed105b27965f8e844a26de6bdf9662.zip
feat(cli): cache
Diffstat (limited to 'crates/sh-util/src/cache/mod.rs')
-rw-r--r--crates/sh-util/src/cache/mod.rs176
1 files changed, 176 insertions, 0 deletions
diff --git a/crates/sh-util/src/cache/mod.rs b/crates/sh-util/src/cache/mod.rs
new file mode 100644
index 0000000..67a5121
--- /dev/null
+++ b/crates/sh-util/src/cache/mod.rs
@@ -0,0 +1,176 @@
+mod cluster;
+mod sentinel;
+pub use sentinel::SentinelConfig;
+
+use std::{sync::Arc, time::Duration};
+
+use bb8::RunError;
+// use bb8_redis::RedisConnectionManager;
+use futures_util::lock::Mutex;
+use redis::{
+ AsyncConnectionConfig, ProtocolVersion, RedisConnectionInfo, RedisError, TlsMode,
+ aio::ConnectionManagerConfig, sentinel::SentinelNodeConnectionInfo,
+};
+
+pub use self::cluster::RedisClusterConnectionManager;
+
+pub const REDIS_CONN_TIMEOUT: Duration = Duration::from_secs(2);
+
+pub enum RedisVariant {
+ Clustered,
+ NonClustered,
+ Sentinel(sentinel::SentinelConfig),
+}
+
+#[derive(Clone)]
+pub enum RedisManager {
+ Clustered(redis::cluster_async::ClusterConnection),
+ NonClustered(redis::aio::ConnectionManager),
+ Sentinel(Arc<Mutex<redis::sentinel::SentinelClient>>),
+}
+
+impl RedisManager {
+ pub async fn new(dsn: &str, variant: RedisVariant) -> Self {
+ match variant {
+ RedisVariant::Clustered => {
+ let cli = redis::cluster::ClusterClient::builder(vec![dsn])
+ .retries(1)
+ .connection_timeout(REDIS_CONN_TIMEOUT)
+ .build()
+ .expect("Error initializing redis-unpooled cluster client");
+ let con = cli
+ .get_async_connection()
+ .await
+ .expect("Failed to get redis-cluster-unpooled connection");
+ RedisManager::Clustered(con)
+ }
+ RedisVariant::NonClustered => {
+ let cli =
+ redis::Client::open(dsn).expect("Error initializing redis unpooled client");
+ let con = redis::aio::ConnectionManager::new_with_config(
+ cli,
+ ConnectionManagerConfig::new()
+ .set_number_of_retries(1)
+ .set_connection_timeout(Some(REDIS_CONN_TIMEOUT)),
+ )
+ .await
+ .expect("Failed to get redis-unpooled connection manager");
+ RedisManager::NonClustered(con)
+ }
+ RedisVariant::Sentinel(cfg) => {
+ let tls_mode = if cfg.redis_tls_mode_secure {
+ TlsMode::Secure
+ } else {
+ TlsMode::Insecure
+ };
+ let protocol = if cfg.redis_use_resp3 {
+ ProtocolVersion::RESP3
+ } else {
+ ProtocolVersion::default()
+ };
+
+ let redis_connection_info = RedisConnectionInfo::default()
+ .set_db(cfg.redis_db.unwrap_or(0))
+ .set_protocol(protocol)
+ .set_username(cfg.redis_username.clone())
+ .set_password(cfg.redis_password.clone());
+ let sentinel = SentinelNodeConnectionInfo::default()
+ .set_redis_connection_info(redis_connection_info)
+ .set_tls_mode(tls_mode);
+
+ let cli = redis::sentinel::SentinelClient::build(
+ vec![dsn],
+ cfg.service_name.clone(),
+ Some(sentinel),
+ redis::sentinel::SentinelServerType::Master,
+ )
+ .expect("Failed to build sentinel client");
+
+ RedisManager::Sentinel(Arc::new(Mutex::new(cli)))
+ }
+ }
+ }
+
+ pub async fn get(&self) -> Result<RedisConnection, RunError<RedisError>> {
+ match self {
+ Self::Clustered(conn) => Ok(RedisConnection::Clustered(conn.clone())),
+ Self::NonClustered(conn) => Ok(RedisConnection::NonClustere(conn.clone())),
+ Self::Sentinel(conn) => {
+ let mut conn = conn.lock().await;
+ let con = conn
+ .get_async_connection_with_config(
+ &AsyncConnectionConfig::new()
+ .set_response_timeout(Some(REDIS_CONN_TIMEOUT)),
+ )
+ .await?;
+ Ok(RedisConnection::Sentinel(con))
+ }
+ }
+ }
+}
+
+pub enum RedisConnection {
+ Clustered(redis::cluster_async::ClusterConnection),
+ NonClustere(redis::aio::ConnectionManager),
+ Sentinel(redis::aio::MultiplexedConnection),
+}
+
+impl redis::aio::ConnectionLike for RedisConnection {
+ fn req_packed_command<'a>(
+ &'a mut self,
+ cmd: &'a redis::Cmd,
+ ) -> redis::RedisFuture<'a, redis::Value> {
+ match self {
+ RedisConnection::Clustered(conn) => conn.req_packed_command(cmd),
+ RedisConnection::NonClustere(conn) => conn.req_packed_command(cmd),
+ RedisConnection::Sentinel(conn) => conn.req_packed_command(cmd),
+ }
+ }
+
+ fn req_packed_commands<'a>(
+ &'a mut self,
+ cmd: &'a redis::Pipeline,
+ offset: usize,
+ count: usize,
+ ) -> redis::RedisFuture<'a, Vec<redis::Value>> {
+ match self {
+ RedisConnection::Clustered(conn) => conn.req_packed_commands(cmd, offset, count),
+ RedisConnection::NonClustere(conn) => conn.req_packed_commands(cmd, offset, count),
+ RedisConnection::Sentinel(conn) => conn.req_packed_commands(cmd, offset, count),
+ }
+ }
+
+ fn get_db(&self) -> i64 {
+ match self {
+ RedisConnection::Clustered(conn) => conn.get_db(),
+ RedisConnection::NonClustere(conn) => conn.get_db(),
+ RedisConnection::Sentinel(conn) => conn.get_db(),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use redis::AsyncCommands;
+
+ use super::RedisManager;
+
+ // Ensure basic set/get works -- should test sharding as well:
+ #[tokio::test]
+ // run with `cargo test -- --ignored redis` only when redis is up and configured
+ #[ignore]
+ async fn test_set_read_random_keys() {
+ let mgr = RedisManager::new(
+ "redis://127.0.0.1:6379/0",
+ super::RedisVariant::NonClustered,
+ )
+ .await;
+ let mut conn = mgr.get().await.unwrap();
+
+ for (val, key) in "abcdefghijklmnopqrstuvwxyz".chars().enumerate() {
+ let key = key.to_string();
+ let _: () = conn.set(key.clone(), val).await.unwrap();
+ assert_eq!(conn.get::<_, usize>(&key).await.unwrap(), val);
+ }
+ }
+}