Java 操作 Redis 的庫有兩個,Jedis 和 Lettuce,目前 SpringBoot 2.x 中已經將 Jedis 換成了 Lettuce,讓我們一起來看看這個東西。
Redis介紹
Redis是一個開源的使用ANSI C語言編寫、支持網絡、可基于內存亦可持久化的日志型、Key-Value數據庫,并提供多種語言的API。相比Memcached它支持存儲的類型相對更多(字符、哈希、集合、有序集合、列表、GEO),同時Redis是線程安全的。2010年3月15日起,Redis的開發(fā)工作由VMware主持,2013年5月開始,Redis的開發(fā)由Pivotal贊助。
Lettuce
Lettuce和Jedis的都是連接Redis Server的客戶端程序。Jedis在實現上是直連redis server,多線程環(huán)境下非線程安全,除非使用連接池,為每個Jedis實例增加物理連接。Lettuce基于Netty的連接實例(StatefulRedisConnection),可以在多個線程間并發(fā)訪問,且線程安全,滿足多線程環(huán)境下的并發(fā)訪問,同時它是可伸縮的設計,一個連接實例不夠的情況也可以按需增加連接實例
導入依賴
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.0.4.RELEASE</version>
</dependency>
單機模式下代碼測試
package testRedis;
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisStringCommands;
public class BasicUsage {
public static void main(String[] args) {
// client
RedisClient client = RedisClient.create("redis://localhost");
// connection, 線程安全的長連接,連接丟失時會自動重連,直到調用 close 關閉連接。
StatefulRedisConnection<String, String> connection = client.connect();
// sync, 默認超時時間為 60s.
RedisStringCommands<String, String> sync = connection.sync();
sync.set("host", "note.abeffect.com");
String value = sync.get("host");
System.out.println(value);
// close connection
connection.close();
// shutdown
client.shutdown();
}
}
集群模式
public class LettuceClusterClient {
public static void main(String[] args) {
ArrayList<RedisURI> list = new ArrayList<>();
list.add(RedisURI.create("redis://192.168.37.128:7000"));
list.add(RedisURI.create("redis://192.168.37.128:7001"));
list.add(RedisURI.create("redis://192.168.37.128:7002"));
list.add(RedisURI.create("redis://192.168.37.128:7003"));
list.add(RedisURI.create("redis://192.168.37.128:7004"));
list.add(RedisURI.create("redis://192.168.37.128:7005"));
RedisClusterClient client = RedisClusterClient.create(list);
//RedisClusterClient client = RedisClusterClient.create("redis://192.168.37.128:7000");
StatefulRedisClusterConnection<String, String> connect = client.connect();
/* 同步執(zhí)行的命令 */
RedisAdvancedClusterCommands<String, String> commands = connect.sync();
String str = commands.get("test2");
System.out.println(str);
/* 異步執(zhí)行的命令 */
// RedisAdvancedClusterAsyncCommands<String, String> commands= connect.async();
// RedisFuture<String> future = commands.get("test2");
// try {
// String str = future.get();
// System.out.println(str);
// } catch (InterruptedException e) {
// e.printStackTrace();
// } catch (ExecutionException e) {
// e.printStackTrace();
// }
connect.close();
client.shutdown();
}
}
其它同步使用方式
設定超時時間為 20s
RedisClient client = RedisClient.create(RedisURI.create("localhost", 6379));
client.setDefaultTimeout(Duration.ofSeconds(20));
使用 RedisURI
RedisURI redisUri = RedisURI.Builder.redis("localhost").withPassword("authentication").withDatabase(2).build();
RedisClient client = RedisClient.create(redisUri);
或者
RedisURI redisUri = RedisURI.create("redis://authentication@localhost/2");
RedisClient client = RedisClient.create(redisUri);
異步使用
異步調用,可以避免將 CPU 浪費在等待網絡 IO 和磁盤 IO 時上,實現提高資源使用率。
基本例子
package testRedis;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisStringAsyncCommands;
public class AsynchronousAPI {
public static void main(String[] args) {
// client
RedisClient client = RedisClient.create("redis://localhost");
// connect
StatefulRedisConnection<String, String> connection = client.connect();
// async
RedisStringAsyncCommands<String, String> async = connection.async();
RedisFuture<String> future = async.get("host");
try {
String value = future.get(60, TimeUnit.SECONDS);
System.out.println(value);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
connection.close();
client.shutdown();
}
}
使用 Consumer 監(jiān)聽器
RedisFuture<String> future = async.get("host");
future.thenAccept(new Consumer<String>() {
@Override
public void accept(String value) {
System.out.println(value);
}
});
使用 Lambda 表達式
RedisFuture<String> future = async.get("host");
future.thenAccept(System.out::println);
使用獨立的線程池
為了防止阻塞默認的線程池,可以在單獨的線程池中執(zhí)行異步請求。
Executor sharedExecutor = Executors.newFixedThreadPool(1);
RedisFuture<String> future = async.get("host");
future.thenAcceptAsync(System.out::println, sharedExecutor);
更多的例子見Asynchronous-API 官方文檔
Reactive 調用
import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RedisStringReactiveCommands;
public class ReactiveAPI {
public static void main(String[] args) {
// client
RedisClient client = RedisClient.create("redis://localhost");
// connect
StatefulRedisConnection<String, String> connection = client.connect();
// reactive
RedisStringReactiveCommands<String, String> reactive = connection.reactive();
reactive.get("host").subscribe(System.out::println);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
connection.close();
client.shutdown();
}
}
更多查看Reactive API 官方文檔,或者 Reactive 相關資料。
Pub/Sub
import io.lettuce.core.RedisClient;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.sync.RedisPubSubCommands;
public class PubSubApi {
public static void main(String[] args) {
RedisClient client = RedisClient.create("redis://localhost");
// connection
RedisPubSubListener<String, String> listener = new RedisPubSubListener<String, String>() {
@Override
public void message(String pattern, String channel) {
System.out.println("message:" + pattern + "," + channel);
}
@Override
public void message(String pattern, String channel, String message) {
System.out.println("message:" + pattern + "," + channel + "," + message);
}
@Override
public void psubscribed(String pattern, long count) {
System.out.println("psub:" + pattern + "," + count);
}
@Override
public void punsubscribed(String pattern, long count) {
System.out.println("punsub:" + pattern + "," + count);
}
@Override
public void subscribed(String channel, long count) {
System.out.println("sub:" + channel + "," + count);
}
@Override
public void unsubscribed(String channel, long count) {
System.out.println("ubsub:" + channel + "," + count);
}
};
StatefulRedisPubSubConnection<String, String> pubSubConnection = client.connectPubSub();
pubSubConnection.addListener(listener);
RedisPubSubCommands<String, String> connection = pubSubConnection.sync();
connection.subscribe("channel");
try {
Thread.sleep(100000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
pubSubConnection.close();
client.shutdown();
}
}
啟動中,在 redis 中執(zhí)行:
127.0.0.1:6379> PUBLISH channel 1
(integer) 1
127.0.0.1:6379> PUBLISH channel 2
(integer) 1
127.0.0.1:6379> PUBLISH channel 3
(integer) 1
輸出結果
[DEBUG] (main) Using Console logging
[DEBUG] (main) Starting UnsafeSupport init in Java 1.8
[TRACE] (main) sun.misc.Unsafe.theUnsafe ok
[TRACE] (main) sun.misc.Unsafe.copyMemory ok
[TRACE] (main) java.nio.Buffer.address ok
[DEBUG] (main) Unsafe is available
Aug 19, 2018 4:41:34 PM io.lettuce.core.EpollProvider <clinit>
信息: Starting without optional epoll library
Aug 19, 2018 4:41:34 PM io.lettuce.core.KqueueProvider <clinit>
信息: Starting without optional kqueue library
sub: channel, 1
message: channel, 1
message: channel, 2
message: channel, 3