From 6a6e303334eca67b3171ba110d28f54dc94237d3 Mon Sep 17 00:00:00 2001 From: mohammed jasem alaajel Date: Sun, 17 Jul 2022 09:51:24 +0400 Subject: [PATCH] updated config, added supported for velocity --- .../src/main/resources/example_config.yml | 1 + .../redisbungee/RedisBungeeBungeePlugin.java | 114 +-- .../minecraft/redisbungee/PlayerUtils.java | 20 +- .../RedisBungeeVelocityListener.java | 91 +- .../RedisBungeeVelocityPlugin.java | 830 ++++++++++++------ .../redisbungee/VelocityDataManager.java | 4 +- 6 files changed, 731 insertions(+), 329 deletions(-) diff --git a/RedisBungee-API/src/main/resources/example_config.yml b/RedisBungee-API/src/main/resources/example_config.yml index 24751ef..0d8494c 100644 --- a/RedisBungee-API/src/main/resources/example_config.yml +++ b/RedisBungee-API/src/main/resources/example_config.yml @@ -18,6 +18,7 @@ max-redis-connections: 10 # since redis can support ssl by version 6 you can use ssl in redis bungee too! # but there is more configuration needed to work see https://github.com/ProxioDev/RedisBungee/issues/18 +# in cluster mode using ssl without password is ignored due fact is not supported in Jedis lib useSSL: false # An identifier for this BungeeCord instance. Will randomly generate if leaving it blank. diff --git a/RedisBungee-Bungee/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeBungeePlugin.java b/RedisBungee-Bungee/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeBungeePlugin.java index b8e2dc7..3c4a489 100644 --- a/RedisBungee-Bungee/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeBungeePlugin.java +++ b/RedisBungee-Bungee/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeBungeePlugin.java @@ -35,6 +35,7 @@ import net.md_5.bungee.config.YamlConfiguration; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.*; import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisException; import java.io.*; import java.lang.reflect.Field; @@ -486,11 +487,58 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin UUIDFetcher.setHttpClient(httpClient); // init lua manager LuaManager luaManager = new LuaManager(this); - if (getRedisBungeeMode() == RedisBungeeMode.CLUSTER) { - this.getRedisClusterTimeScript = luaManager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/get_cluster_time.lua"))); - } + new RedisTask(this) { + @Override + public Void jedisTask(Jedis jedis) { + // This is more portable than INFO
+ String info = jedis.info(); + for (String s : info.split("\r\n")) { + if (s.startsWith("redis_version:")) { + String version = s.split(":")[1]; + getLogger().info(version + " <- redis version"); + if (!RedisUtil.isRedisVersionRight(version)) { + getLogger().severe("Your version of Redis (" + version + ") is not at least version 3.0 RedisBungee requires a newer version of Redis."); + throw new RuntimeException("Unsupported Redis version detected"); + } + long uuidCacheSize = jedis.hlen("uuid-cache"); + if (uuidCacheSize > 750000) { + getLogger().info("Looks like you have a really big UUID cache! Run https://www.spigotmc.org/resources/redisbungeecleaner.8505/ as soon as possible."); + } + break; + } + } + return null; + } + + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + // This is more portable than INFO
+ try { + getRedisClusterTimeScript = luaManager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/get_cluster_time.lua"))); + } catch (JedisException e) { + throw new RuntimeException("possible not supported redis version", e); + } + String info = (String) luaManager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/get_cluster_info.lua"))).eval(Collections.singletonList("0"), Collections.emptyList()); + for (String s : info.split("\r\n")) { + if (s.startsWith("redis_version:")) { + String version = s.split(":")[1]; + getLogger().info(version + " <- redis version"); + if (!RedisUtil.isRedisVersionRight(version)) { + getLogger().severe("Your version of Redis (" + version + ") is not at least version 3.0 RedisBungee requires a newer version of Redis."); + throw new RuntimeException("Unsupported Redis version detected"); + } + long uuidCacheSize = jedisCluster.hlen("uuid-cache"); + if (uuidCacheSize > 750000) { + getLogger().info("Looks like you have a really big UUID cache! Run https://www.spigotmc.org/resources/redisbungeecleaner.8505/ as soon as possible."); + } + break; + } + } + return null; + } + }.execute(); getLogger().info("lua manager was loaded"); - // check if this proxy is recovering from a crash + // check if this proxy is recovering from a crash and start heart the beat. new RedisTask(api) { @Override public Void jedisTask(Jedis jedis) { @@ -509,6 +557,8 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin } } catch (NumberFormatException ignored) { } + } else { + jedis.hset("heartbeats", configuration.getServerId(), jedis.time().get(0)); } return null; @@ -523,6 +573,7 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin try { long value = Long.parseLong(jedisCluster.hget("heartbeats", configuration.getServerId())); long redisTime = getRedisClusterTime(); + if (redisTime < value + 20) { getLogger().severe("You have launched a possible impostor Velocity / Bungeecord instance. Another instance is already running."); getLogger().severe("For data consistency reasons, RedisBungee will now disable itself."); @@ -531,55 +582,8 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin } } catch (NumberFormatException ignored) { } - } - return null; - } - }.execute(); - - - new RedisTask(this) { - @Override - public Void jedisTask(Jedis jedis) { - // This is more portable than INFO
- String info = jedis.info(); - for (String s : info.split("\r\n")) { - if (s.startsWith("redis_version:")) { - String version = s.split(":")[1]; - getLogger().info(version + " <- redis version"); - if (!RedisUtil.isRedisVersionRight(version)) { - getLogger().severe("Your version of Redis (" + version + ") is not at least version 3.0 RedisBungee requires a newer version of Redis."); - throw new RuntimeException("Unsupported Redis version detected"); - } - break; - } - } - jedis.hset("heartbeats", configuration.getServerId(), jedis.time().get(0)); - long uuidCacheSize = jedis.hlen("uuid-cache"); - if (uuidCacheSize > 750000) { - getLogger().info("Looks like you have a really big UUID cache! Run https://www.spigotmc.org/resources/redisbungeecleaner.8505/ as soon as possible."); - } - return null; - } - - @Override - public Void clusterJedisTask(JedisCluster jedisCluster) { - // This is more portable than INFO
- String info = (String) luaManager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/get_cluster_info.lua"))).eval(Collections.singletonList("0"), Collections.emptyList()); - for (String s : info.split("\r\n")) { - if (s.startsWith("redis_version:")) { - String version = s.split(":")[1]; - getLogger().info(version + " <- redis version"); - if (!RedisUtil.isRedisVersionRight(version)) { - getLogger().severe("Your version of Redis (" + version + ") is not at least version 3.0 RedisBungee requires a newer version of Redis."); - throw new RuntimeException("Unsupported Redis version detected"); - } - break; - } - } - jedisCluster.hset("heartbeats", configuration.getServerId(), String.valueOf(getRedisClusterTime())); - long uuidCacheSize = jedisCluster.hlen("uuid-cache"); - if (uuidCacheSize > 750000) { - getLogger().info("Looks like you have a really big UUID cache! Run https://www.spigotmc.org/resources/redisbungeecleaner.8505/ as soon as possible."); + } else { + jedisCluster.hset("heartbeats", configuration.getServerId(), String.valueOf(getRedisClusterTime())); } return null; } @@ -798,7 +802,6 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin getProxy().getPluginManager().unregisterListeners(this); - new RedisTask(api) { @Override public Void jedisTask(Jedis jedis) { @@ -886,7 +889,8 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin } else { this.jedisSummoner = new ClusterJedisSummoner(new JedisCluster(new HostAndPort(redisServer, redisPort), 5000, 5000, 60, poolConfig)); getLogger().warning("SSL option is ignored in Cluster mode if no PASSWORD is set"); - }this.redisBungeeMode = RedisBungeeMode.CLUSTER; + } + this.redisBungeeMode = RedisBungeeMode.CLUSTER; getLogger().log(Level.INFO, "RedisBungee MODE: CLUSTER"); } else { JedisPoolConfig config = new JedisPoolConfig(); diff --git a/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/PlayerUtils.java b/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/PlayerUtils.java index 3c3a36b..2573871 100644 --- a/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/PlayerUtils.java +++ b/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/PlayerUtils.java @@ -4,13 +4,14 @@ import com.google.gson.Gson; import com.imaginarycode.minecraft.redisbungee.api.AbstractDataManager; import com.velocitypowered.api.proxy.Player; import com.velocitypowered.api.proxy.ServerConnection; +import redis.clients.jedis.JedisCluster; import redis.clients.jedis.Pipeline; import java.util.HashMap; import java.util.Map; import java.util.Optional; -public class RBUtils { +public class PlayerUtils { private static final Gson gson = new Gson(); protected static void createPlayer(Player player, Pipeline pipeline, boolean fireEvent) { @@ -32,5 +33,22 @@ public class RBUtils { } } + protected static void createPlayer(Player player, JedisCluster jedisCluster, boolean fireEvent) { + Optional server = player.getCurrentServer(); + server.ifPresent(serverConnection -> jedisCluster.hset("player:" + player.getUniqueId().toString(), "server", serverConnection.getServerInfo().getName())); + Map playerData = new HashMap<>(4); + playerData.put("online", "0"); + playerData.put("ip", player.getRemoteAddress().getHostName()); + playerData.put("proxy", RedisBungeeAPI.getRedisBungeeApi().getServerId()); + + jedisCluster.sadd("proxy:" + RedisBungeeAPI.getRedisBungeeApi().getServerId() + ":usersOnline", player.getUniqueId().toString()); + jedisCluster.hmset("player:" + player.getUniqueId().toString(), playerData); + + if (fireEvent) { + jedisCluster.publish("redisbungee-data", gson.toJson(new AbstractDataManager.DataManagerMessage<>( + player.getUniqueId(), RedisBungeeAPI.getRedisBungeeApi().getServerId(), AbstractDataManager.DataManagerMessage.Action.JOIN, + new AbstractDataManager.LoginPayload(player.getRemoteAddress().getAddress())))); + } + } } diff --git a/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeVelocityListener.java b/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeVelocityListener.java index fd0afe6..afd05c4 100644 --- a/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeVelocityListener.java +++ b/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeVelocityListener.java @@ -6,11 +6,12 @@ import com.google.common.collect.Multimap; import com.google.common.io.ByteArrayDataInput; import com.google.common.io.ByteArrayDataOutput; import com.google.common.io.ByteStreams; -import com.imaginarycode.minecraft.redisbungee.internal.AbstractRedisBungeeListener; -import com.imaginarycode.minecraft.redisbungee.internal.AbstractDataManager; -import com.imaginarycode.minecraft.redisbungee.internal.RedisBungeePlugin; +import com.imaginarycode.minecraft.redisbungee.api.AbstractRedisBungeeListener; +import com.imaginarycode.minecraft.redisbungee.api.AbstractDataManager; +import com.imaginarycode.minecraft.redisbungee.api.RedisBungeePlugin; +import com.imaginarycode.minecraft.redisbungee.api.tasks.RedisTask; import com.imaginarycode.minecraft.redisbungee.events.PubSubMessageEvent; -import com.imaginarycode.minecraft.redisbungee.internal.RedisUtil; +import com.imaginarycode.minecraft.redisbungee.api.util.RedisUtil; import com.velocitypowered.api.event.Continuation; import com.velocitypowered.api.event.PostOrder; import com.velocitypowered.api.event.ResultedEvent; @@ -27,6 +28,7 @@ import com.velocitypowered.api.proxy.ServerConnection; import com.velocitypowered.api.proxy.server.ServerPing; import net.kyori.adventure.text.serializer.legacy.LegacyComponentSerializer; import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; import redis.clients.jedis.Pipeline; import java.net.InetAddress; @@ -43,9 +45,9 @@ public class RedisBungeeVelocityListener extends AbstractRedisBungeeListener(plugin) { + plugin.executeAsync(new RedisTask(plugin) { @Override - protected Void call(Jedis jedis) { + public Void jedisTask(Jedis jedis) { try { if (!event.getResult().isAllowed()) { return null; @@ -73,21 +75,49 @@ public class RedisBungeeVelocityListener extends AbstractRedisBungeeListener(plugin) { + plugin.executeAsync(new RedisTask(plugin) { @Override - protected Void call(Jedis jedis) { - // this code was moved out from login event due being async.. - // and it can be cancelled but it will show as false in redis-bungee - // which will register the player into the redis database. + public Void jedisTask(Jedis jedis) { Pipeline pipeline = jedis.pipelined(); plugin.getUuidTranslator().persistInfo(event.getPlayer().getUsername(), event.getPlayer().getUniqueId(), pipeline); - RBUtils.createPlayer(event.getPlayer(), pipeline, false); + PlayerUtils.createPlayer(event.getPlayer(), pipeline, false); pipeline.sync(); // the end of moved code. @@ -96,20 +126,39 @@ public class RedisBungeeVelocityListener extends AbstractRedisBungeeListener( + event.getPlayer().getUniqueId(), plugin.getApi().getServerId(), AbstractDataManager.DataManagerMessage.Action.JOIN, + new AbstractDataManager.LoginPayload(event.getPlayer().getRemoteAddress().getAddress())))); + return null; + } }); } @Override @Subscribe public void onPlayerDisconnect(DisconnectEvent event) { - plugin.executeAsync(new RedisCallable(plugin) { + plugin.executeAsync(new RedisTask(plugin) { @Override - protected Void call(Jedis jedis) { + public Void jedisTask(Jedis jedis) { Pipeline pipeline = jedis.pipelined(); RedisUtil.cleanUpPlayer(event.getPlayer().getUniqueId().toString(), pipeline); pipeline.sync(); return null; } + + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + RedisUtil.cleanUpPlayer(event.getPlayer().getUniqueId().toString(), jedisCluster); + return null; + } + }); } @@ -119,15 +168,25 @@ public class RedisBungeeVelocityListener extends AbstractRedisBungeeListener optionalServerConnection = event.getPlayer().getCurrentServer(); final String currentServer = optionalServerConnection.map(serverConnection -> serverConnection.getServerInfo().getName()).orElse(null); - plugin.executeAsync(new RedisCallable(plugin) { + plugin.executeAsync(new RedisTask(plugin) { + @Override - protected Void call(Jedis jedis) { + public Void jedisTask(Jedis jedis) { jedis.hset("player:" + event.getPlayer().getUniqueId().toString(), "server", event.getServer().getServerInfo().getName()); jedis.publish("redisbungee-data", gson.toJson(new AbstractDataManager.DataManagerMessage<>( event.getPlayer().getUniqueId(), plugin.getApi().getServerId(), AbstractDataManager.DataManagerMessage.Action.SERVER_CHANGE, new AbstractDataManager.ServerChangePayload(event.getServer().getServerInfo().getName(), currentServer)))); return null; } + + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + jedisCluster.hset("player:" + event.getPlayer().getUniqueId().toString(), "server", event.getServer().getServerInfo().getName()); + jedisCluster.publish("redisbungee-data", gson.toJson(new AbstractDataManager.DataManagerMessage<>( + event.getPlayer().getUniqueId(), plugin.getApi().getServerId(), AbstractDataManager.DataManagerMessage.Action.SERVER_CHANGE, + new AbstractDataManager.ServerChangePayload(event.getServer().getServerInfo().getName(), currentServer)))); + return null; + } }); } diff --git a/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeVelocityPlugin.java b/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeVelocityPlugin.java index 057df38..9f9743c 100644 --- a/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeVelocityPlugin.java +++ b/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeVelocityPlugin.java @@ -8,18 +8,21 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import com.google.common.reflect.TypeToken; import com.google.inject.Inject; +import com.imaginarycode.minecraft.redisbungee.api.*; +import com.imaginarycode.minecraft.redisbungee.api.summoners.ClusterJedisSummoner; +import com.imaginarycode.minecraft.redisbungee.api.summoners.JedisSummoner; +import com.imaginarycode.minecraft.redisbungee.api.summoners.Summoner; +import com.imaginarycode.minecraft.redisbungee.api.tasks.RedisTask; +import com.imaginarycode.minecraft.redisbungee.api.util.RedisUtil; +import com.imaginarycode.minecraft.redisbungee.api.util.io.IOUtil; +import com.imaginarycode.minecraft.redisbungee.api.util.lua.LuaManager; +import com.imaginarycode.minecraft.redisbungee.api.util.uuid.NameFetcher; +import com.imaginarycode.minecraft.redisbungee.api.util.uuid.UUIDFetcher; +import com.imaginarycode.minecraft.redisbungee.api.util.uuid.UUIDTranslator; import com.imaginarycode.minecraft.redisbungee.commands.RedisBungeeCommands; import com.imaginarycode.minecraft.redisbungee.events.PlayerChangedServerNetworkEvent; import com.imaginarycode.minecraft.redisbungee.events.PlayerJoinedNetworkEvent; import com.imaginarycode.minecraft.redisbungee.events.PubSubMessageEvent; -import com.imaginarycode.minecraft.redisbungee.internal.*; -import com.imaginarycode.minecraft.redisbungee.internal.summoners.Summoner; -import com.imaginarycode.minecraft.redisbungee.internal.summoners.SinglePoolJedisSummoner; -import com.imaginarycode.minecraft.redisbungee.internal.util.IOUtil; -import com.imaginarycode.minecraft.redisbungee.internal.util.LuaManager; -import com.imaginarycode.minecraft.redisbungee.internal.util.uuid.NameFetcher; -import com.imaginarycode.minecraft.redisbungee.internal.util.uuid.UUIDFetcher; -import com.imaginarycode.minecraft.redisbungee.internal.util.uuid.UUIDTranslator; import com.squareup.okhttp.Dispatcher; import com.squareup.okhttp.OkHttpClient; import com.velocitypowered.api.event.Subscribe; @@ -36,12 +39,11 @@ import com.velocitypowered.api.scheduler.ScheduledTask; import ninja.leaping.configurate.ConfigurationNode; import ninja.leaping.configurate.objectmapping.ObjectMappingException; import ninja.leaping.configurate.yaml.YAMLConfigurationLoader; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.slf4j.Logger; -import redis.clients.jedis.Jedis; -import redis.clients.jedis.JedisPool; -import redis.clients.jedis.JedisPoolConfig; -import redis.clients.jedis.Pipeline; +import redis.clients.jedis.*; import redis.clients.jedis.exceptions.JedisConnectionException; +import redis.clients.jedis.exceptions.JedisException; import static com.google.common.base.Preconditions.checkArgument; @@ -53,6 +55,7 @@ import java.nio.file.StandardCopyOption; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; @Plugin(id = "redisbungee", name = "RedisBungee", version = PomData.VERSION, url = "https://github.com/ProxioDev/RedisBungee", authors = "ProxioDev") public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { @@ -61,7 +64,8 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { private final Path dataFolder; private final RedisBungeeAPI api; private final PubSubListener psl; - private Summoner jedisSummoner; + private Summoner jedisSummoner; + private RedisBungeeMode redisBungeeMode; private final UUIDTranslator uuidTranslator; private RedisBungeeConfiguration configuration; private final VelocityDataManager dataManager; @@ -71,13 +75,13 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { private final AtomicInteger globalPlayerCount = new AtomicInteger(); private ScheduledTask integrityCheck; private ScheduledTask heartbeatTask; - private LuaManager.Script serverToPlayersScript; - private LuaManager.Script getPlayerCountScript; + + private LuaManager.Script getRedisClusterTimeScript; private static final Object SERVER_TO_PLAYERS_KEY = new Object(); public static final List IDENTIFIERS = List.of( - MinecraftChannelIdentifier.create("legacy", "redisbungee"), - new LegacyChannelIdentifier("RedisBungee") + MinecraftChannelIdentifier.create("legacy", "redisbungee"), + new LegacyChannelIdentifier("RedisBungee") ); private final Cache> serverToPlayersCache = CacheBuilder.newBuilder() .expireAfterWrite(5, TimeUnit.SECONDS) @@ -97,6 +101,118 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { throw new RuntimeException("Unable to connect to your Redis server!", e); } this.api = new RedisBungeeAPI(this); + LuaManager luaManager = new LuaManager(this); + new RedisTask(this) { + @Override + public Void jedisTask(Jedis jedis) { + // This is more portable than INFO
+ String info = jedis.info(); + for (String s : info.split("\r\n")) { + if (s.startsWith("redis_version:")) { + String version = s.split(":")[1]; + getLogger().info("{} <- redis version", version); + if (!RedisUtil.isRedisVersionRight(version)) { + getLogger().error("Your version of Redis ({}) is not at least version 6.0 RedisBungee requires a newer version of Redis.", version); + throw new RuntimeException("Unsupported Redis version detected"); + } + long uuidCacheSize = jedis.hlen("uuid-cache"); + if (uuidCacheSize > 750000) { + getLogger().info("Looks like you have a really big UUID cache! Run https://www.spigotmc.org/resources/redisbungeecleaner.8505/ as soon as possible."); + } + break; + } + } + return null; + } + + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + // This is more portable than INFO
+ try { + getRedisClusterTimeScript = luaManager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/get_cluster_time.lua"))); + } catch (JedisException e) { + throw new RuntimeException("possible not supported redis version", e); + } + String info = (String) luaManager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/get_cluster_info.lua"))).eval(Collections.singletonList("0"), Collections.emptyList()); + for (String s : info.split("\r\n")) { + if (s.startsWith("redis_version:")) { + String version = s.split(":")[1]; + getLogger().info("{} <- redis version", version); + if (!RedisUtil.isRedisVersionRight(version)) { + getLogger().error("Your version of Redis ({}) is not at least version 6.0 RedisBungee requires a newer version of Redis.", version); + throw new RuntimeException("Unsupported Redis version detected"); + } + long uuidCacheSize = jedisCluster.hlen("uuid-cache"); + if (uuidCacheSize > 750000) { + getLogger().info("Looks like you have a really big UUID cache! Run https://www.spigotmc.org/resources/redisbungeecleaner.8505/ as soon as possible."); + } + break; + } + } + return null; + } + }.execute(); + getLogger().info("lua manager was loaded"); + // check if this proxy is recovering from a crash and start heart the beat. + new RedisTask(api) { + @Override + public Void jedisTask(Jedis jedis) { + Path crashFile = getDataFolder().resolve("restarted_from_crash.txt"); + if (Files.exists(crashFile)) { + try { + Files.delete(crashFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + getLogger().info("crash file was deleted"); + } else if (jedis.hexists("heartbeats", configuration.getServerId())) { + try { + long value = Long.parseLong(jedis.hget("heartbeats", configuration.getServerId())); + long redisTime = getRedisTime(jedis.time()); + if (redisTime < value + 20) { + getLogger().error("You have launched a possible impostor Velocity / Bungeecord instance. Another instance is already running."); + getLogger().error("For data consistency reasons, RedisBungee will now disable itself."); + getLogger().error("If this instance is coming up from a crash, create a file in your RedisBungee plugins directory with the name 'restarted_from_crash.txt' and RedisBungee will not perform this check."); + throw new RuntimeException("Possible impostor instance!"); + } + } catch (NumberFormatException ignored) { + } + } else { + jedis.hset("heartbeats", configuration.getServerId(), jedis.time().get(0)); + } + + return null; + } + + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + Path crashFile = getDataFolder().resolve("restarted_from_crash.txt"); + if (Files.exists(crashFile)) { + try { + Files.delete(crashFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + getLogger().info("crash file was deleted"); + } else if (jedisCluster.hexists("heartbeats", configuration.getServerId())) { + try { + long value = Long.parseLong(jedisCluster.hget("heartbeats", configuration.getServerId())); + long redisTime = getRedisClusterTime(); + + if (redisTime < value + 20) { + getLogger().error("You have launched a possible impostor Velocity / Bungeecord instance. Another instance is already running."); + getLogger().error("For data consistency reasons, RedisBungee will now disable itself."); + getLogger().error("If this instance is coming up from a crash, create a file in your RedisBungee plugins directory with the name 'restarted_from_crash.txt' and RedisBungee will not perform this check."); + throw new RuntimeException("Possible impostor instance!"); + } + } catch (NumberFormatException ignored) { + } + } else { + jedisCluster.hset("heartbeats", configuration.getServerId(), String.valueOf(getRedisClusterTime())); + } + return null; + } + }.execute(); uuidTranslator = new UUIDTranslator(this); dataManager = new VelocityDataManager(this); psl = new PubSubListener(this); @@ -107,26 +223,7 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { UUIDFetcher.setHttpClient(httpClient); // keeping this lol new RedisBungee(api); - // check if redis version compatible - try (Jedis jedis = requestJedis()) { - String info = jedis.info(); - for (String s : info.split("\r\n")) { - if (s.startsWith("redis_version:")) { - String version = s.split(":")[1]; - getLogger().info("{} <- redis version", version); - if (!RedisUtil.isRedisVersionRight(version)) { - getLogger().error("Your version of Redis ({}) is not at least version 6.0 RedisBungee requires a newer version of Redis.", version); - throw new RuntimeException("Unsupported Redis version detected"); - } else { - LuaManager manager = new LuaManager(this); - serverToPlayersScript = manager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/server_to_players.lua"))); - getPlayerCountScript = manager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/get_player_count.lua"))); - getLogger().info("lua manager was loaded"); - } - break; - } - } - } + } @@ -143,8 +240,42 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { @Override public int getCurrentCount() { - Long count = (Long) getPlayerCountScript.eval(ImmutableList.of(), ImmutableList.of()); - return count.intValue(); + return new RedisTask(api) { + @Override + public Long jedisTask(Jedis jedis) { + long total = 0; + long redisTime = getRedisTime(jedis.time()); + Map heartBeats = jedis.hgetAll("heartbeats"); + for (Map.Entry stringStringEntry : heartBeats.entrySet()) { + String k = stringStringEntry.getKey(); + String v = stringStringEntry.getValue(); + + long heartbeatTime = Long.parseLong(v); + if (heartbeatTime + 30 >= redisTime) { + total = total + jedis.scard("proxy:" + k + ":usersOnline"); + } + } + return total; + } + + @Override + public Long clusterJedisTask(JedisCluster jedisCluster) { + long total = 0; + long redisTime = getRedisClusterTime(); + Map heartBeats = jedisCluster.hgetAll("heartbeats"); + for (Map.Entry stringStringEntry : heartBeats.entrySet()) { + String k = stringStringEntry.getKey(); + String v = stringStringEntry.getValue(); + + long heartbeatTime = Long.parseLong(v); + if (heartbeatTime + 30 >= redisTime) { + total = total + jedisCluster.scard("proxy:" + k + ":usersOnline"); + } + } + return total; + } + }.execute().intValue(); + } @Override @@ -163,45 +294,66 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { @Override public Set getPlayers() { - ImmutableSet.Builder setBuilder = ImmutableSet.builder(); - if (isJedisAvailable()) { - try (Jedis rsc = requestJedis()) { - List keys = new ArrayList<>(); - for (String i : getServerIds()) { - keys.add("proxy:" + i + ":usersOnline"); - } - if (!keys.isEmpty()) { - Set users = rsc.sunion(keys.toArray(new String[0])); - if (users != null && !users.isEmpty()) { - for (String user : users) { - try { - setBuilder = setBuilder.add(UUID.fromString(user)); - } catch (IllegalArgumentException ignored) { + return new RedisTask>(api) { + @Override + public Set jedisTask(Jedis jedis) { + ImmutableSet.Builder setBuilder = ImmutableSet.builder(); + try { + List keys = new ArrayList<>(); + for (String i : getServerIds()) { + keys.add("proxy:" + i + ":usersOnline"); + } + if (!keys.isEmpty()) { + Set users = jedis.sunion(keys.toArray(new String[0])); + if (users != null && !users.isEmpty()) { + for (String user : users) { + try { + setBuilder = setBuilder.add(UUID.fromString(user)); + } catch (IllegalArgumentException ignored) { + } } } } + } catch (JedisConnectionException e) { + // Redis server has disappeared! + getLogger().error("Unable to get connection from pool - did your Redis server go away?", e); + throw new RuntimeException("Unable to get all players online", e); } - } catch (JedisConnectionException e) { - // Redis server has disappeared! - getLogger().error("Unable to get connection from pool - did your Redis server go away?", e); - throw new RuntimeException("Unable to get all players online", e); + return setBuilder.build(); } - } - return setBuilder.build(); + + @Override + public Set clusterJedisTask(JedisCluster jedisCluster) { + ImmutableSet.Builder setBuilder = ImmutableSet.builder(); + try { + List keys = new ArrayList<>(); + for (String i : getServerIds()) { + keys.add("proxy:" + i + ":usersOnline"); + } + if (!keys.isEmpty()) { + Set users = jedisCluster.sunion(keys.toArray(new String[0])); + if (users != null && !users.isEmpty()) { + for (String user : users) { + try { + setBuilder = setBuilder.add(UUID.fromString(user)); + } catch (IllegalArgumentException ignored) { + } + } + } + } + } catch (JedisConnectionException e) { + // Redis server has disappeared! + getLogger().error("Unable to get connection from pool - did your Redis server go away?", e); + throw new RuntimeException("Unable to get all players online", e); + } + return setBuilder.build(); + } + }.execute(); } - @Override - public Jedis requestJedis() { - return this.jedisSummoner.requestJedis(); - } @Override - public boolean isJedisAvailable() { - return this.jedisSummoner.isJedisAvailable(); - } - - @Override - public Summoner getSummoner() { + public Summoner getSummoner() { return this.jedisSummoner; } @@ -216,24 +368,32 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { } @Override - @SuppressWarnings("unchecked") public Multimap serversToPlayers() { try { - return serverToPlayersCache.get(SERVER_TO_PLAYERS_KEY, () -> { - Collection data = (Collection) serverToPlayersScript.eval(ImmutableList.of(), getServerIds()); - ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); - String key = null; - for (String s : data) { - if (key == null) { - key = s; - continue; + return serverToPlayersCache.get(SERVER_TO_PLAYERS_KEY, new RedisTask>(api) { + @Override + public Multimap jedisTask(Jedis jedis) { + ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); + for (String serverId : getServerIds()) { + Set players = jedis.smembers("proxy:" + serverId + ":usersOnline"); + for (String player : players) { + builder.put(serverId, UUID.fromString(player)); + } } - - builder.put(key, UUID.fromString(s)); - key = null; + return builder.build(); } - return builder.build(); + @Override + public Multimap clusterJedisTask(JedisCluster jedisCluster) { + ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); + for (String serverId : getServerIds()) { + Set players = jedisCluster.smembers("proxy:" + serverId + ":usersOnline"); + for (String player : players) { + builder.put(serverId, UUID.fromString(player)); + } + } + return builder.build(); + } }); } catch (ExecutionException e) { throw new RuntimeException(e); @@ -243,14 +403,27 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { @Override public Set getPlayersOnProxy(String proxyId) { checkArgument(getServerIds().contains(proxyId), proxyId + " is not a valid proxy ID"); - try (Jedis jedis = requestJedis()) { - Set users = jedis.smembers("proxy:" + proxyId + ":usersOnline"); - ImmutableSet.Builder builder = ImmutableSet.builder(); - for (String user : users) { - builder.add(UUID.fromString(user)); + return new RedisTask>(api) { + @Override + public Set jedisTask(Jedis jedis) { + Set users = jedis.smembers("proxy:" + proxyId + ":usersOnline"); + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (String user : users) { + builder.add(UUID.fromString(user)); + } + return builder.build(); } - return builder.build(); - } + + @Override + public Set clusterJedisTask(JedisCluster jedisCluster) { + Set users = jedisCluster.smembers("proxy:" + proxyId + ":usersOnline"); + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (String user : users) { + builder.add(UUID.fromString(user)); + } + return builder.build(); + } + }.execute(); } @Override @@ -264,36 +437,74 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { return serverIds; } + @Override public List getCurrentServerIds(boolean nag, boolean lagged) { - try (Jedis jedis = requestJedis()) { - long time = getRedisTime(jedis.time()); - int nagTime = 0; - if (nag) { - nagTime = nagAboutServers.decrementAndGet(); - if (nagTime <= 0) { - nagAboutServers.set(10); - } - } - ImmutableList.Builder servers = ImmutableList.builder(); - Map heartbeats = jedis.hgetAll("heartbeats"); - for (Map.Entry entry : heartbeats.entrySet()) { + return new RedisTask>(api) { + @Override + public List jedisTask(Jedis jedis) { try { - long stamp = Long.parseLong(entry.getValue()); - if (lagged ? time >= stamp + 30 : time <= stamp + 30) - servers.add(entry.getKey()); - else if (nag && nagTime <= 0) { - getLogger().warn("{} is {} seconds behind! (Time not synchronized or server down?) and was removed from heartbeat.", entry.getKey(), (time - stamp)); - jedis.hdel("heartbeats", entry.getKey()); + long time = getRedisTime(jedis.time()); + int nagTime = 0; + if (nag) { + nagTime = nagAboutServers.decrementAndGet(); + if (nagTime <= 0) { + nagAboutServers.set(10); + } } - } catch (NumberFormatException ignored) { + ImmutableList.Builder servers = ImmutableList.builder(); + Map heartbeats = jedis.hgetAll("heartbeats"); + for (Map.Entry entry : heartbeats.entrySet()) { + try { + long stamp = Long.parseLong(entry.getValue()); + if (lagged ? time >= stamp + 30 : time <= stamp + 30) + servers.add(entry.getKey()); + else if (nag && nagTime <= 0) { + getLogger().warn("{} is {} seconds behind! (Time not synchronized or server down?) and was removed from heartbeat.", entry.getKey(), (time - stamp)); + jedis.hdel("heartbeats", entry.getKey()); + } + } catch (NumberFormatException ignored) { + } + } + return servers.build(); + } catch (JedisConnectionException e) { + getLogger().error("Unable to fetch server IDs", e); + return Collections.singletonList(configuration.getServerId()); } } - return servers.build(); - } catch (JedisConnectionException e) { - getLogger().error("Unable to fetch server IDs", e); - return Collections.singletonList(configuration.getServerId()); - } + + @Override + public List clusterJedisTask(JedisCluster jedisCluster) { + try { + long time = getRedisClusterTime(); + int nagTime = 0; + if (nag) { + nagTime = nagAboutServers.decrementAndGet(); + if (nagTime <= 0) { + nagAboutServers.set(10); + } + } + ImmutableList.Builder servers = ImmutableList.builder(); + Map heartbeats = jedisCluster.hgetAll("heartbeats"); + for (Map.Entry entry : heartbeats.entrySet()) { + try { + long stamp = Long.parseLong(entry.getValue()); + if (lagged ? time >= stamp + 30 : time <= stamp + 30) + servers.add(entry.getKey()); + else if (nag && nagTime <= 0) { + getLogger().warn("{} is {} seconds behind! (Time not synchronized or server down?) and was removed from heartbeat.", entry.getKey(), (time - stamp)); + jedisCluster.hdel("heartbeats", entry.getKey()); + } + } catch (NumberFormatException ignored) { + } + } + return servers.build(); + } catch (JedisConnectionException e) { + getLogger().error("Unable to fetch server IDs", e); + return Collections.singletonList(configuration.getServerId()); + } + } + }.execute(); } @Override @@ -303,13 +514,31 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { @Override public void sendChannelMessage(String channel, String message) { - try (Jedis jedis = requestJedis()) { - jedis.publish(channel, message); - } catch (JedisConnectionException e) { - // Redis server has disappeared! - getLogger().error("Unable to get connection from pool - did your Redis server go away?", e); - throw new RuntimeException("Unable to publish channel message", e); - } + new RedisTask(api) { + @Override + public Void jedisTask(Jedis jedis) { + try { + jedis.publish(channel, message); + } catch (JedisConnectionException e) { + // Redis server has disappeared! + getLogger().error("Unable to get connection from pool - did your Redis server go away?", e); + throw new RuntimeException("Unable to publish channel message", e); + } + return null; + } + + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + try { + jedisCluster.publish(channel, message); + } catch (JedisConnectionException e) { + // Redis server has disappeared! + getLogger().error("Unable to get connection from pool - did your Redis server go away?", e); + throw new RuntimeException("Unable to publish channel message", e); + } + return null; + } + }.execute(); } @Override @@ -396,95 +625,176 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { @Override public void initialize() { - // set the first heartbeat - try (Jedis tmpRsc = requestJedis()) { - tmpRsc.hset("heartbeats", configuration.getServerId(), tmpRsc.time().get(0)); - long uuidCacheSize = tmpRsc.hlen("uuid-cache"); - if (uuidCacheSize > 750000) { - getLogger().info("Looks like you have a really big UUID cache! Run https://www.spigotmc.org/resources/redisbungeecleaner.8505/ as soon as possible."); - } - } serverIds = getCurrentServerIds(true, false); - heartbeatTask = getProxy().getScheduler().buildTask(this, () -> { - try (Jedis rsc = requestJedis()) { - long redisTime = getRedisTime(rsc.time()); - rsc.hset("heartbeats", configuration.getServerId(), String.valueOf(redisTime)); - } catch (JedisConnectionException e) { - // Redis server has disappeared! - getLogger().error("Unable to update heartbeat - did your Redis server go away?", e); - return; + // start heartbeat task + RedisTask heartBeatRedisTask = new RedisTask(api) { + @Override + public Void jedisTask(Jedis jedis) { + try { + long redisTime = getRedisTime(jedis.time()); + jedis.hset("heartbeats", configuration.getServerId(), String.valueOf(redisTime)); + } catch (JedisConnectionException e) { + // Redis server has disappeared! + getLogger().error("Unable to update heartbeat - did your Redis server go away?", e); + return null; + } + try { + serverIds = getCurrentServerIds(true, false); + globalPlayerCount.set(getCurrentCount()); + } catch (Throwable e) { + getLogger().error("Unable to update data - did your Redis server go away?", e); + } + return null; } - try { - serverIds = getCurrentServerIds(true, false); - globalPlayerCount.set(getCurrentCount()); - } catch (Throwable e) { - getLogger().error("Unable to update data - did your Redis server go away?", e); + + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + try { + long redisTime = getRedisClusterTime(); + jedisCluster.hset("heartbeats", configuration.getServerId(), String.valueOf(redisTime)); + } catch (JedisConnectionException e) { + // Redis server has disappeared! + getLogger().error("Unable to update heartbeat - did your Redis server go away?", e); + return null; + } + try { + serverIds = getCurrentServerIds(true, false); + globalPlayerCount.set(getCurrentCount()); + } catch (Throwable e) { + getLogger().error("Unable to update data - did your Redis server go away?", e); + } + return null; } - }).repeat(3, TimeUnit.SECONDS).schedule(); + }; + heartbeatTask = getProxy().getScheduler().buildTask(this, heartBeatRedisTask::execute).repeat(3, TimeUnit.SECONDS).schedule(); getProxy().getEventManager().register(this, new RedisBungeeVelocityListener(this, configuration.getExemptAddresses())); getProxy().getEventManager().register(this, dataManager); getProxy().getScheduler().buildTask(this, psl).schedule(); - integrityCheck = getProxy().getScheduler().buildTask(this, () -> { - try (Jedis tmpRsc = requestJedis()) { - Set players = getLocalPlayersAsUuidStrings(); - Set playersInRedis = tmpRsc.smembers("proxy:" + configuration.getServerId() + ":usersOnline"); - List lagged = getCurrentServerIds(false, true); + RedisTask integrityCheckRedisTask = new RedisTask(api) { + @Override + public Void jedisTask(Jedis jedis) { + try { + Set players = getLocalPlayersAsUuidStrings(); + Set playersInRedis = jedis.smembers("proxy:" + configuration.getServerId() + ":usersOnline"); + List lagged = getCurrentServerIds(false, true); - // Clean up lagged players. - for (String s : lagged) { - Set laggedPlayers = tmpRsc.smembers("proxy:" + s + ":usersOnline"); - tmpRsc.del("proxy:" + s + ":usersOnline"); - if (!laggedPlayers.isEmpty()) { - getLogger().info("Cleaning up lagged proxy {} ({} players)...", s, laggedPlayers.size()); - for (String laggedPlayer : laggedPlayers) { - RedisUtil.cleanUpPlayer(laggedPlayer, tmpRsc); + // Clean up lagged players. + for (String s : lagged) { + Set laggedPlayers = jedis.smembers("proxy:" + s + ":usersOnline"); + jedis.del("proxy:" + s + ":usersOnline"); + if (!laggedPlayers.isEmpty()) { + getLogger().info("Cleaning up lagged proxy {} ({} players)...", s, laggedPlayers.size()); + for (String laggedPlayer : laggedPlayers) { + RedisUtil.cleanUpPlayer(laggedPlayer, jedis); + } } } - } - Set absentLocally = new HashSet<>(playersInRedis); - absentLocally.removeAll(players); - Set absentInRedis = new HashSet<>(players); - absentInRedis.removeAll(playersInRedis); + Set absentLocally = new HashSet<>(playersInRedis); + absentLocally.removeAll(players); + Set absentInRedis = new HashSet<>(players); + absentInRedis.removeAll(playersInRedis); - for (String member : absentLocally) { - boolean found = false; - for (String proxyId : getServerIds()) { - if (proxyId.equals(configuration.getServerId())) continue; - if (tmpRsc.sismember("proxy:" + proxyId + ":usersOnline", member)) { - // Just clean up the set. - found = true; - break; + for (String member : absentLocally) { + boolean found = false; + for (String proxyId : getServerIds()) { + if (proxyId.equals(configuration.getServerId())) continue; + if (jedis.sismember("proxy:" + proxyId + ":usersOnline", member)) { + // Just clean up the set. + found = true; + break; + } + } + if (!found) { + RedisUtil.cleanUpPlayer(member, jedis); + getLogger().warn("Player found in set that was not found locally and globally: {}", member); + } else { + jedis.srem("proxy:" + configuration.getServerId() + ":usersOnline", member); + getLogger().warn("Player found in set that was not found locally, but is on another proxy: {}", member); } } - if (!found) { - RedisUtil.cleanUpPlayer(member, tmpRsc); - getLogger().warn("Player found in set that was not found locally and globally: {}", member); - } else { - tmpRsc.srem("proxy:" + configuration.getServerId() + ":usersOnline", member); - getLogger().warn("Player found in set that was not found locally, but is on another proxy: {}", member); + + Pipeline pipeline = jedis.pipelined(); + + for (String player : absentInRedis) { + // Player not online according to Redis but not BungeeCord. + getLogger().warn("Player {} is on the proxy but not in Redis.", player); + + Player playerProxied = getProxy().getPlayer(UUID.fromString(player)).orElse(null); + if (playerProxied == null) + continue; // We'll deal with it later. + + PlayerUtils.createPlayer(playerProxied, pipeline, true); } + + pipeline.sync(); + } catch (Throwable e) { + getLogger().error("Unable to fix up stored player data", e); } - - Pipeline pipeline = tmpRsc.pipelined(); - - for (String player : absentInRedis) { - // Player not online according to Redis but not BungeeCord. - getLogger().warn("Player {} is on the proxy but not in Redis.", player); - - Player playerProxied = getProxy().getPlayer(UUID.fromString(player)).orElse(null); - if (playerProxied == null) - continue; // We'll deal with it later. - - RBUtils.createPlayer(playerProxied, pipeline, true); - } - - pipeline.sync(); - } catch (Throwable e) { - getLogger().error("Unable to fix up stored player data", e); + return null; } - }).repeat(1, TimeUnit.MINUTES).schedule(); + + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + try { + Set players = getLocalPlayersAsUuidStrings(); + Set playersInRedis = jedisCluster.smembers("proxy:" + configuration.getServerId() + ":usersOnline"); + List lagged = getCurrentServerIds(false, true); + + // Clean up lagged players. + for (String s : lagged) { + Set laggedPlayers = jedisCluster.smembers("proxy:" + s + ":usersOnline"); + jedisCluster.del("proxy:" + s + ":usersOnline"); + if (!laggedPlayers.isEmpty()) { + getLogger().info("Cleaning up lagged proxy {} ({} players)...", s, laggedPlayers.size()); + for (String laggedPlayer : laggedPlayers) { + RedisUtil.cleanUpPlayer(laggedPlayer, jedisCluster); + } + } + } + + Set absentLocally = new HashSet<>(playersInRedis); + absentLocally.removeAll(players); + Set absentInRedis = new HashSet<>(players); + absentInRedis.removeAll(playersInRedis); + + for (String member : absentLocally) { + boolean found = false; + for (String proxyId : getServerIds()) { + if (proxyId.equals(configuration.getServerId())) continue; + if (jedisCluster.sismember("proxy:" + proxyId + ":usersOnline", member)) { + // Just clean up the set. + found = true; + break; + } + } + if (!found) { + RedisUtil.cleanUpPlayer(member, jedisCluster); + getLogger().warn("Player found in set that was not found locally and globally: {}", member); + } else { + jedisCluster.srem("proxy:" + configuration.getServerId() + ":usersOnline", member); + getLogger().warn("Player found in set that was not found locally, but is on another proxy: {}", member); + } + } + + for (String player : absentInRedis) { + // Player not online according to Redis but not BungeeCord. + getLogger().warn("Player {} is on the proxy but not in Redis.", player); + + Player playerProxied = getProxy().getPlayer(UUID.fromString(player)).orElse(null); + if (playerProxied == null) + continue; // We'll deal with it later. + + PlayerUtils.createPlayer(playerProxied, jedisCluster, true); + } + } catch (Throwable e) { + getLogger().error("Unable to fix up stored player data", e); + } + return null; + } + }; + integrityCheck = getProxy().getScheduler().buildTask(this, integrityCheckRedisTask::execute).repeat(1, TimeUnit.MINUTES).schedule(); // register plugin messages IDENTIFIERS.forEach(getProxy().getChannelRegistrar()::register); @@ -494,7 +804,7 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { if (configuration.doOverrideBungeeCommands()) { getProxy().getCommandManager().register("glist", new RedisBungeeCommands.GlistCommand(this), "redisbungee", "rglist"); } - + getProxy().getCommandManager().register("sendtoall", new RedisBungeeCommands.SendToAll(this), "rsendtoall"); getProxy().getCommandManager().register("serverid", new RedisBungeeCommands.ServerId(this), "rserverid"); getProxy().getCommandManager().register("serverids", new RedisBungeeCommands.ServerIds(this)); @@ -507,25 +817,45 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { @Override public void stop() { - if (isJedisAvailable()) { - // Poison the PubSub listener + // Poison the PubSub listener + if (psl != null) { psl.poison(); - integrityCheck.cancel(); - heartbeatTask.cancel(); - try (Jedis tmpRsc = requestJedis()) { - tmpRsc.hdel("heartbeats", configuration.getServerId()); - if (tmpRsc.scard("proxy:" + configuration.getServerId() + ":usersOnline") > 0) { - Set players = tmpRsc.smembers("proxy:" + configuration.getServerId() + ":usersOnline"); - for (String member : players) - RedisUtil.cleanUpPlayer(member, tmpRsc); - } - } - try { - this.jedisSummoner.close(); - } catch (IOException e) { - throw new RuntimeException(e); - } } + if (integrityCheck != null) { + integrityCheck.cancel(); + } + if (heartbeatTask != null) { + heartbeatTask.cancel(); + } + new RedisTask(api) { + @Override + public Void jedisTask(Jedis jedis) { + jedis.hdel("heartbeats", configuration.getServerId()); + if (jedis.scard("proxy:" + configuration.getServerId() + ":usersOnline") > 0) { + Set players = jedis.smembers("proxy:" + configuration.getServerId() + ":usersOnline"); + for (String member : players) + RedisUtil.cleanUpPlayer(member, jedis); + } + return null; + } + + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + jedisCluster.hdel("heartbeats", configuration.getServerId()); + if (jedisCluster.scard("proxy:" + configuration.getServerId() + ":usersOnline") > 0) { + Set players = jedisCluster.smembers("proxy:" + configuration.getServerId() + ":usersOnline"); + for (String member : players) + RedisUtil.cleanUpPlayer(member, jedisCluster); + } + return null; + } + }.execute(); + try { + this.jedisSummoner.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + this.httpClient.getDispatcher().getExecutorService().shutdown(); try { this.httpClient.getDispatcher().getExecutorService().awaitTermination(20, TimeUnit.SECONDS); @@ -543,7 +873,7 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { } catch (IOException e) { getLogger().error("Cannot create data folder", e); } - + } Path file = getDataFolder().resolve("config.yml"); if (Files.notExists(file)) { @@ -557,11 +887,11 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { } final YAMLConfigurationLoader yamlConfiguration = YAMLConfigurationLoader.builder().setPath(file).build(); ConfigurationNode node = yamlConfiguration.load(); - final String redisServer = node.getNode("redis-server").getString(); - final int redisPort = node.getNode("redis-port").getInt(); - final boolean useSSL = node.getNode("useSSL").getBoolean(); - String redisPassword = node.getNode("redis-password").getString(); - String serverId = node.getNode("server-id").getString(); + final String redisServer = node.getNode("redis-server").getString("127.0.0.1"); + final int redisPort = node.getNode("redis-port").getInt(6379); + final boolean useSSL = node.getNode("useSSL").getBoolean(false); + String redisPassword = node.getNode("redis-password").getString(null); + String serverId = node.getNode("server-id").getString("test-1"); // check redis password if (redisPassword != null && (redisPassword.isEmpty() || redisPassword.equals("none"))) { @@ -586,57 +916,47 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { getLogger().info("Loaded server id {}.", serverId); } try { - this.configuration = new RedisBungeeConfiguration(serverId, node.getNode("exempt-ip-addresses").getList(TypeToken.of(String.class)), node.getNode("register-bungee-commands").getBoolean()); + this.configuration = new RedisBungeeConfiguration(serverId, node.getNode("exempt-ip-addresses").getList(TypeToken.of(String.class)), node.getNode("register-bungee-commands").getBoolean(true)); } catch (ObjectMappingException e) { throw new RuntimeException(e); } if (redisServer != null && !redisServer.isEmpty()) { - try { - JedisPoolConfig config = new JedisPoolConfig(); - config.setMaxTotal(node.getNode("max-redis-connections").getInt()); - this.jedisSummoner = new SinglePoolJedisSummoner(new JedisPool(config, redisServer, redisPort, 0, redisPassword, useSSL)); - - } catch (JedisConnectionException e) { - throw new RuntimeException("Unable to create Redis pool", e); - } - - // Test the connection - try (Jedis rsc = requestJedis()) { - rsc.ping(); - // If that worked, now we can check for an existing, alive Bungee: - Path crashFile = getDataFolder().resolve("restarted_from_crash.txt"); - if (Files.exists(crashFile)) { - try { - Files.delete(crashFile); - getLogger().info("crash file was deleted"); - } catch (IOException e) { - getLogger().error("Cannot delete crash file", e); - } - - } else if (rsc.hexists("heartbeats", serverId)) { - try { - long value = Long.parseLong(rsc.hget("heartbeats", serverId)); - long redisTime = getRedisTime(rsc.time()); - if (redisTime < value + 20) { - getLogger().error("You have launched a possible impostor Velocity / Bungeecord instance. Another instance is already running."); - getLogger().error("For data consistency reasons, RedisBungee will now disable itself."); - getLogger().error("If this instance is coming up from a crash, create a file in your RedisBungee plugins directory with the name 'restarted_from_crash.txt' and RedisBungee will not perform this check."); - throw new RuntimeException("Possible impostor instance!"); - } - } catch (NumberFormatException ignored) { - } + if (node.getNode("cluster-mode-enabled").getBoolean(false)) { + GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig<>(); + poolConfig.setMaxTotal(node.getNode("max-redis-connections").getInt(8)); + if (redisPassword != null) { + this.jedisSummoner = new ClusterJedisSummoner(new JedisCluster(new HostAndPort(redisServer, redisPort), 5000, 5000, 60, serverId, redisPassword, poolConfig, useSSL)); + } else { + getLogger().warn("SSL option is ignored in Cluster mode if no PASSWORD is set"); + this.jedisSummoner = new ClusterJedisSummoner(new JedisCluster(new HostAndPort(redisServer, redisPort), 5000, 5000, 60, poolConfig)); } - getLogger().info("Successfully connected to Redis."); - } catch (JedisConnectionException e) { - this.jedisSummoner.close(); - throw e; + this.redisBungeeMode = RedisBungeeMode.CLUSTER; + getLogger().info("RedisBungee MODE: CLUSTER"); + } else { + JedisPoolConfig config = new JedisPoolConfig(); + config.setMaxTotal(node.getNode("max-redis-connections").getInt(8)); + this.jedisSummoner = new JedisSummoner(new JedisPool(config, redisServer, redisPort, 0, redisPassword, useSSL)); + this.redisBungeeMode = RedisBungeeMode.SINGLE; + getLogger().info("RedisBungee MODE: SINGLE"); } + getLogger().info("Successfully connected to Redis."); + } else { throw new RuntimeException("No redis server specified!"); } } + @Override + public RedisBungeeMode getRedisBungeeMode() { + return this.redisBungeeMode; + } + + @Override + public Long getRedisClusterTime() { + return getRedisTime((List) this.getRedisClusterTimeScript.eval(Collections.singletonList("0"), Collections.emptyList())); + } + @Subscribe public void proxyInit(ProxyInitializeEvent event) { initialize(); diff --git a/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/VelocityDataManager.java b/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/VelocityDataManager.java index 1daa3f7..cf3c82f 100644 --- a/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/VelocityDataManager.java +++ b/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/VelocityDataManager.java @@ -1,8 +1,8 @@ package com.imaginarycode.minecraft.redisbungee; import com.imaginarycode.minecraft.redisbungee.events.PubSubMessageEvent; -import com.imaginarycode.minecraft.redisbungee.internal.AbstractDataManager; -import com.imaginarycode.minecraft.redisbungee.internal.RedisBungeePlugin; +import com.imaginarycode.minecraft.redisbungee.api.AbstractDataManager; +import com.imaginarycode.minecraft.redisbungee.api.RedisBungeePlugin; import com.velocitypowered.api.event.Subscribe; import com.velocitypowered.api.event.connection.DisconnectEvent; import com.velocitypowered.api.event.connection.PostLoginEvent;