From 86c6e9464d79ab9ff19ee42d2e4c109c885f3614 Mon Sep 17 00:00:00 2001 From: mohammed jasem alaajel Date: Tue, 26 Jul 2022 10:58:00 +0400 Subject: [PATCH] internal changes --- .../minecraft/redisbungee/RedisBungeeAPI.java | 2 +- .../redisbungee/api/RedisBungeePlugin.java | 298 ++++++++- .../redisbungee/api/tasks/HeartbeatTask.java | 2 +- .../redisbungee/api/tasks/InitialUtils.java | 136 ++++ .../api/tasks/IntegrityCheckTask.java | 145 +++++ .../redisbungee/api/tasks/ShutdownUtils.java | 39 ++ .../redisbungee/api/util/RedisUtil.java | 1 + .../{get_cluster_info.lua => get_info.lua} | 0 .../{get_cluster_time.lua => get_time.lua} | 0 .../redisbungee/RedisBungeeBungeePlugin.java | 577 +---------------- .../RedisBungeeVelocityPlugin.java | 586 +----------------- 11 files changed, 650 insertions(+), 1136 deletions(-) create mode 100644 RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/InitialUtils.java create mode 100644 RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/IntegrityCheckTask.java create mode 100644 RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/ShutdownUtils.java rename RedisBungee-API/src/main/resources/lua/{get_cluster_info.lua => get_info.lua} (100%) rename RedisBungee-API/src/main/resources/lua/{get_cluster_time.lua => get_time.lua} (100%) diff --git a/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeAPI.java b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeAPI.java index aa0aa73..ecaaaa8 100644 --- a/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeAPI.java +++ b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeAPI.java @@ -105,7 +105,7 @@ public class RedisBungeeAPI { * @since 0.2.5 */ public final Multimap getServerToPlayers() { - return plugin.serversToPlayers(); + return plugin.serverToPlayersCache(); } /** diff --git a/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/RedisBungeePlugin.java b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/RedisBungeePlugin.java index 2d939c8..d6dbac0 100644 --- a/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/RedisBungeePlugin.java +++ b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/RedisBungeePlugin.java @@ -1,18 +1,27 @@ package com.imaginarycode.minecraft.redisbungee.api; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMultimap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import com.imaginarycode.minecraft.redisbungee.RedisBungeeAPI; import com.imaginarycode.minecraft.redisbungee.api.config.ConfigLoader; import com.imaginarycode.minecraft.redisbungee.api.config.RedisBungeeConfiguration; import com.imaginarycode.minecraft.redisbungee.api.summoners.Summoner; +import com.imaginarycode.minecraft.redisbungee.api.tasks.RedisTask; +import com.imaginarycode.minecraft.redisbungee.api.util.RedisUtil; +import com.imaginarycode.minecraft.redisbungee.api.util.payload.PayloadUtils; import com.imaginarycode.minecraft.redisbungee.api.util.uuid.UUIDTranslator; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.exceptions.JedisConnectionException; import java.net.InetAddress; -import java.util.List; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.concurrent.TimeUnit; +import static com.google.common.base.Preconditions.checkArgument; + /** * This Class has all internal methods needed by every redis bungee plugin, and it can be used to implement another platforms than bungeecord @@ -36,31 +45,265 @@ public interface RedisBungeePlugin

extends EventsPlatform, ConfigLoader { int getCount(); - int getCurrentCount(); + default int getCurrentCount() { + return new RedisTask(this) { + @Override + public Long jedisTask(Jedis jedis) { + long total = 0; + long redisTime = getRedisTime(jedis.time()); + Map heartBeats = jedis.hgetAll("heartbeats"); + for (Map.Entry stringStringEntry : heartBeats.entrySet()) { + String k = stringStringEntry.getKey(); + String v = stringStringEntry.getValue(); + + long heartbeatTime = Long.parseLong(v); + if (heartbeatTime + 30 >= redisTime) { + total = total + jedis.scard("proxy:" + k + ":usersOnline"); + } + } + return total; + } + + @Override + public Long clusterJedisTask(JedisCluster jedisCluster) { + long total = 0; + long redisTime = getRedisTime(); + Map heartBeats = jedisCluster.hgetAll("heartbeats"); + for (Map.Entry stringStringEntry : heartBeats.entrySet()) { + String k = stringStringEntry.getKey(); + String v = stringStringEntry.getValue(); + + long heartbeatTime = Long.parseLong(v); + if (heartbeatTime + 30 >= redisTime) { + total = total + jedisCluster.scard("proxy:" + k + ":usersOnline"); + } + } + return total; + } + }.execute().intValue(); + } Set getLocalPlayersAsUuidStrings(); AbstractDataManager getDataManager(); - Set getPlayers(); + default Set getPlayers() { + return new RedisTask>(this) { + @Override + public Set jedisTask(Jedis jedis) { + ImmutableSet.Builder setBuilder = ImmutableSet.builder(); + try { + List keys = new ArrayList<>(); + for (String i : getProxiesIds()) { + keys.add("proxy:" + i + ":usersOnline"); + } + if (!keys.isEmpty()) { + Set users = jedis.sunion(keys.toArray(new String[0])); + if (users != null && !users.isEmpty()) { + for (String user : users) { + try { + setBuilder = setBuilder.add(UUID.fromString(user)); + } catch (IllegalArgumentException ignored) { + } + } + } + } + } catch (JedisConnectionException e) { + // Redis server has disappeared! + logFatal("Unable to get connection from pool - did your Redis server go away?"); + throw new RuntimeException("Unable to get all players online", e); + } + return setBuilder.build(); + } + + @Override + public Set clusterJedisTask(JedisCluster jedisCluster) { + ImmutableSet.Builder setBuilder = ImmutableSet.builder(); + try { + List keys = new ArrayList<>(); + for (String i : getProxiesIds()) { + keys.add("proxy:" + i + ":usersOnline"); + } + if (!keys.isEmpty()) { + Set users = jedisCluster.sunion(keys.toArray(new String[0])); + if (users != null && !users.isEmpty()) { + for (String user : users) { + try { + setBuilder = setBuilder.add(UUID.fromString(user)); + } catch (IllegalArgumentException ignored) { + } + } + } + } + } catch (JedisConnectionException e) { + // Redis server has disappeared! + logFatal("Unable to get connection from pool - did your Redis server go away?"); + throw new RuntimeException("Unable to get all players online", e); + } + return setBuilder.build(); + } + }.execute(); + } RedisBungeeAPI getApi(); UUIDTranslator getUuidTranslator(); - Multimap serversToPlayers(); + Multimap serverToPlayersCache(); - Set getPlayersOnProxy(String proxyId); + default Multimap serversToPlayers() { + return new RedisTask>(this) { + @Override + public Multimap jedisTask(Jedis jedis) { + ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); + for (String serverId : getProxiesIds()) { + Set players = jedis.smembers("proxy:" + serverId + ":usersOnline"); + for (String player : players) { + String playerServer = jedis.hget("player:" + player, "server"); + if (playerServer == null) { + continue; + } + builder.put(playerServer, UUID.fromString(player)); + } + } + return builder.build(); + } - void sendProxyCommand(String serverId, String command); + @Override + public Multimap clusterJedisTask(JedisCluster jedisCluster) { + ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); + for (String serverId : getProxiesIds()) { + Set players = jedisCluster.smembers("proxy:" + serverId + ":usersOnline"); + for (String player : players) { + String playerServer = jedisCluster.hget("player:" + player, "server"); + if (playerServer == null) { + continue; + } + builder.put(playerServer, UUID.fromString(player)); + } + } + return builder.build(); + } + }.execute(); + } + + default Set getPlayersOnProxy(String proxyId) { + checkArgument(getProxiesIds().contains(proxyId), proxyId + " is not a valid proxy ID"); + return new RedisTask>(this) { + @Override + public Set jedisTask(Jedis jedis) { + Set users = jedis.smembers("proxy:" + proxyId + ":usersOnline"); + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (String user : users) { + builder.add(UUID.fromString(user)); + } + return builder.build(); + } + + @Override + public Set clusterJedisTask(JedisCluster jedisCluster) { + Set users = jedisCluster.smembers("proxy:" + proxyId + ":usersOnline"); + ImmutableSet.Builder builder = ImmutableSet.builder(); + for (String user : users) { + builder.add(UUID.fromString(user)); + } + return builder.build(); + } + }.execute(); + } + + default void sendProxyCommand(String proxyId, String command) { + checkArgument(getProxiesIds().contains(proxyId) || proxyId.equals("allservers"), "proxyId is invalid"); + sendChannelMessage("redisbungee-" + proxyId, command); + } List getProxiesIds(); - List getCurrentProxiesIds(boolean nag, boolean lagged); + default List getCurrentProxiesIds(boolean lagged) { + return new RedisTask>(this) { + @Override + public List jedisTask(Jedis jedis) { + try { + long time = getRedisTime(jedis.time()); + ImmutableList.Builder servers = ImmutableList.builder(); + Map heartbeats = jedis.hgetAll("heartbeats"); + for (Map.Entry entry : heartbeats.entrySet()) { + try { + long stamp = Long.parseLong(entry.getValue()); + if (lagged ? time >= stamp + RedisUtil.PROXY_TIMEOUT : time <= stamp + RedisUtil.PROXY_TIMEOUT) { + servers.add(entry.getKey()); + } else if (time > stamp + RedisUtil.PROXY_TIMEOUT) { + logWarn(entry.getKey() + " is " + (time - stamp) + " seconds behind! (Time not synchronized or server down?) and was removed from heartbeat."); + jedis.hdel("heartbeats", entry.getKey()); + } + } catch (NumberFormatException ignored) { + } + } + return servers.build(); + } catch (JedisConnectionException e) { + logFatal("Unable to fetch server IDs"); + e.printStackTrace(); + return Collections.singletonList(getConfiguration().getProxyId()); + } + } + + @Override + public List clusterJedisTask(JedisCluster jedisCluster) { + try { + long time = getRedisTime(); + ImmutableList.Builder servers = ImmutableList.builder(); + Map heartbeats = jedisCluster.hgetAll("heartbeats"); + for (Map.Entry entry : heartbeats.entrySet()) { + try { + long stamp = Long.parseLong(entry.getValue()); + if (lagged ? time >= stamp + RedisUtil.PROXY_TIMEOUT : time <= stamp + RedisUtil.PROXY_TIMEOUT) { + servers.add(entry.getKey()); + } else if (time > stamp + RedisUtil.PROXY_TIMEOUT) { + logWarn(entry.getKey() + " is " + (time - stamp) + " seconds behind! (Time not synchronized or server down?) and was removed from heartbeat."); + jedisCluster.hdel("heartbeats", entry.getKey()); + } + } catch (NumberFormatException ignored) { + } + } + return servers.build(); + } catch (JedisConnectionException e) { + logFatal("Unable to fetch server IDs"); + e.printStackTrace(); + return Collections.singletonList(getConfiguration().getProxyId()); + } + } + }.execute(); + } PubSubListener getPubSubListener(); - void sendChannelMessage(String channel, String message); + default void sendChannelMessage(String channel, String message) { + new RedisTask(this) { + @Override + public Void jedisTask(Jedis jedis) { + try { + jedis.publish(channel, message); + } catch (JedisConnectionException e) { + // Redis server has disappeared! + logFatal("Unable to get connection from pool - did your Redis server go away?"); + throw new RuntimeException("Unable to publish channel message", e); + } + return null; + } + + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + try { + jedisCluster.publish(channel, message); + } catch (JedisConnectionException e) { + // Redis server has disappeared! + logFatal("Unable to get connection from pool - did your Redis server go away?"); + throw new RuntimeException("Unable to publish channel message", e); + } + return null; + } + }.execute(); + } void executeAsync(Runnable runnable); @@ -90,17 +333,42 @@ public interface RedisBungeePlugin

extends EventsPlatform, ConfigLoader { InetAddress getPlayerIp(P player); - void sendProxyCommand(String cmd); + default void sendProxyCommand(String cmd) { + sendProxyCommand(getConfiguration().getProxyId(), cmd); + } - long getRedisTime(List timeRes); + default long getRedisTime(List timeRes) { + return Long.parseLong(timeRes.get(0)); + } - void kickPlayer(UUID playerUniqueId, String message); + default void kickPlayer(UUID playerUniqueId, String message) { + // first handle on origin proxy if player not found publish the payload + if (!getDataManager().handleKick(playerUniqueId, message)) { + new RedisTask(this) { + @Override + public Void jedisTask(Jedis jedis) { + PayloadUtils.kickPlayerPayload(playerUniqueId, message, jedis); + return null; + } - void kickPlayer(String playerName, String message); + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + PayloadUtils.kickPlayerPayload(playerUniqueId, message, jedisCluster); + return null; + } + }.execute(); + } + } + + default void kickPlayer(String playerName, String message) { + // fetch the uuid from name + UUID playerUUID = getUuidTranslator().getTranslatedUuid(playerName, true); + kickPlayer(playerUUID, message); + } RedisBungeeMode getRedisBungeeMode(); - Long getRedisClusterTime(); + Long getRedisTime(); void updateProxyIds(); diff --git a/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/HeartbeatTask.java b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/HeartbeatTask.java index 4e61df2..44e208b 100644 --- a/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/HeartbeatTask.java +++ b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/HeartbeatTask.java @@ -43,7 +43,7 @@ public class HeartbeatTask extends RedisTask{ @Override public Void clusterJedisTask(JedisCluster jedisCluster) { try { - long redisTime = plugin.getRedisClusterTime(); + long redisTime = plugin.getRedisTime(); jedisCluster.hset("heartbeats", plugin.getConfiguration().getProxyId(), String.valueOf(redisTime)); } catch (JedisConnectionException e) { // Redis server has disappeared! diff --git a/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/InitialUtils.java b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/InitialUtils.java new file mode 100644 index 0000000..86e5482 --- /dev/null +++ b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/InitialUtils.java @@ -0,0 +1,136 @@ +package com.imaginarycode.minecraft.redisbungee.api.tasks; + +import com.imaginarycode.minecraft.redisbungee.api.RedisBungeePlugin; +import com.imaginarycode.minecraft.redisbungee.api.util.RedisUtil; +import com.imaginarycode.minecraft.redisbungee.api.util.io.IOUtil; +import com.imaginarycode.minecraft.redisbungee.api.util.lua.LuaManager; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.exceptions.JedisException; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Collections; + +public class InitialUtils { + + public static LuaManager.Script getTimeScript(RedisBungeePlugin plugin, LuaManager luaManager) { + return new RedisTask(plugin) { + @Override + public LuaManager.Script jedisTask(Jedis jedis) { + // This is more portable than INFO

+ String info = jedis.info(); + for (String s : info.split("\r\n")) { + if (s.startsWith("redis_version:")) { + String version = s.split(":")[1]; + plugin.logInfo(version + " <- redis version"); + if (!RedisUtil.isRedisVersionRight(version)) { + plugin.logFatal("Your version of Redis (" + version + ") is not at least version 3.0 RedisBungee requires a newer version of Redis."); + throw new RuntimeException("Unsupported Redis version detected"); + } + long uuidCacheSize = jedis.hlen("uuid-cache"); + if (uuidCacheSize > 750000) { + plugin.logInfo("Looks like you have a really big UUID cache! Run https://www.spigotmc.org/resources/redisbungeecleaner.8505/ as soon as possible."); + } + break; + } + } + return null; + } + + @Override + public LuaManager.Script clusterJedisTask(JedisCluster jedisCluster) { + // This is more portable than INFO
+ LuaManager.Script getRedisClusterTimeScript; + try { + getRedisClusterTimeScript = luaManager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/get_time.lua"))); + } catch (JedisException e) { + throw new RuntimeException("possible not supported redis version", e); + } + String info = (String) luaManager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/get_info.lua"))).eval(Collections.singletonList("0"), Collections.emptyList()); + for (String s : info.split("\r\n")) { + if (s.startsWith("redis_version:")) { + String version = s.split(":")[1]; + plugin.logInfo(version + " <- redis version"); + if (!RedisUtil.isRedisVersionRight(version)) { + plugin.logFatal("Your version of Redis (" + version + ") is not at least version 3.0 RedisBungee requires a newer version of Redis."); + throw new RuntimeException("Unsupported Redis version detected"); + } + long uuidCacheSize = jedisCluster.hlen("uuid-cache"); + if (uuidCacheSize > 750000) { + plugin.logInfo("Looks like you have a really big UUID cache! Run https://www.spigotmc.org/resources/redisbungeecleaner.8505/ as soon as possible."); + } + break; + } + } + return getRedisClusterTimeScript; + } + }.execute(); + } + + + public static void checkIfRecovering(RedisBungeePlugin plugin, Path dataFolder) { + new RedisTask(plugin) { + @Override + public Void jedisTask(Jedis jedis) { + Path crashFile = dataFolder.resolve("restarted_from_crash.txt"); + if (Files.exists(crashFile)) { + try { + Files.delete(crashFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + plugin.logInfo("crash file was deleted"); + } else if (jedis.hexists("heartbeats", plugin.getConfiguration().getProxyId())) { + try { + long value = Long.parseLong(jedis.hget("heartbeats", plugin.getConfiguration().getProxyId())); + long redisTime = plugin.getRedisTime(jedis.time()); + if (redisTime < value + RedisUtil.PROXY_TIMEOUT) { + logImposter(plugin); + throw new RuntimeException("Possible impostor instance!"); + } + } catch (NumberFormatException ignored) { + } + } + return null; + } + + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + Path crashFile = dataFolder.resolve("restarted_from_crash.txt"); + if (Files.exists(crashFile)) { + try { + Files.delete(crashFile); + } catch (IOException e) { + throw new RuntimeException(e); + } + plugin.logInfo("crash file was deleted"); + } else if (jedisCluster.hexists("heartbeats", plugin.getConfiguration().getProxyId())) { + try { + long value = Long.parseLong(jedisCluster.hget("heartbeats", plugin.getConfiguration().getProxyId())); + long redisTime = plugin.getRedisTime(); + + if (redisTime < value + RedisUtil.PROXY_TIMEOUT) { + logImposter(plugin); + throw new RuntimeException("Possible impostor instance!"); + } + } catch (NumberFormatException ignored) { + } + } + return null; + } + }.execute(); + } + + private static void logImposter(RedisBungeePlugin plugin) { + plugin.logFatal("You have launched a possible impostor Velocity / Bungeecord instance. Another instance is already running."); + plugin.logFatal("For data consistency reasons, RedisBungee will now disable itself."); + plugin.logFatal("If this instance is coming up from a crash, create a file in your RedisBungee plugins directory with the name 'restarted_from_crash.txt' and RedisBungee will not perform this check."); + } + + private static InputStream getResourceAsStream(String resource) { + return InitialUtils.class.getClassLoader().getResourceAsStream(resource); + } +} diff --git a/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/IntegrityCheckTask.java b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/IntegrityCheckTask.java new file mode 100644 index 0000000..3fcae8b --- /dev/null +++ b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/IntegrityCheckTask.java @@ -0,0 +1,145 @@ +package com.imaginarycode.minecraft.redisbungee.api.tasks; + +import com.imaginarycode.minecraft.redisbungee.api.GenericPlayerUtils; +import com.imaginarycode.minecraft.redisbungee.api.RedisBungeePlugin; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; +import redis.clients.jedis.Pipeline; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; + +public abstract class IntegrityCheckTask extends RedisTask { + + public static int INTERVAL = 30; + public static TimeUnit TIMEUNIT = TimeUnit.SECONDS; + + + public IntegrityCheckTask(RedisBungeePlugin plugin) { + super(plugin); + } + + @Override + public Void jedisTask(Jedis jedis) { + try { + Set players = plugin.getLocalPlayersAsUuidStrings(); + Set playersInRedis = jedis.smembers("proxy:" + plugin.getConfiguration().getProxyId() + ":usersOnline"); + List lagged = plugin.getCurrentProxiesIds(true); + + // Clean up lagged players. + for (String s : lagged) { + Set laggedPlayers = jedis.smembers("proxy:" + s + ":usersOnline"); + jedis.del("proxy:" + s + ":usersOnline"); + if (!laggedPlayers.isEmpty()) { + plugin.logInfo("Cleaning up lagged proxy " + s + " (" + laggedPlayers.size() + " players)..."); + for (String laggedPlayer : laggedPlayers) { + GenericPlayerUtils.cleanUpPlayer(laggedPlayer, jedis, true); + } + } + } + + Set absentLocally = new HashSet<>(playersInRedis); + absentLocally.removeAll(players); + Set absentInRedis = new HashSet<>(players); + absentInRedis.removeAll(playersInRedis); + + for (String member : absentLocally) { + boolean found = false; + for (String proxyId : plugin.getProxiesIds()) { + if (proxyId.equals(plugin.getConfiguration().getProxyId())) continue; + if (jedis.sismember("proxy:" + proxyId + ":usersOnline", member)) { + // Just clean up the set. + found = true; + break; + } + } + if (!found) { + GenericPlayerUtils.cleanUpPlayer(member, jedis, false); + plugin.logWarn("Player found in set that was not found locally and globally: " + member); + } else { + jedis.srem("proxy:" + plugin.getConfiguration().getProxyId() + ":usersOnline", member); + plugin.logWarn("Player found in set that was not found locally, but is on another proxy: " + member); + } + } + + Pipeline pipeline = jedis.pipelined(); + + for (String player : absentInRedis) { + // Player not online according to Redis but not BungeeCord. + plugin.logWarn("Player " + player + " is on the proxy but not in Redis."); + handlePlatformPlayer(player, pipeline); + } + + pipeline.sync(); + } catch (Throwable e) { + plugin.logFatal("Unable to fix up stored player data"); + e.printStackTrace(); + } + return null; + } + + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + try { + Set players = plugin.getLocalPlayersAsUuidStrings(); + Set playersInRedis = jedisCluster.smembers("proxy:" + plugin.getConfiguration().getProxyId() + ":usersOnline"); + List lagged = plugin.getCurrentProxiesIds(true); + + // Clean up lagged players. + for (String s : lagged) { + Set laggedPlayers = jedisCluster.smembers("proxy:" + s + ":usersOnline"); + jedisCluster.del("proxy:" + s + ":usersOnline"); + if (!laggedPlayers.isEmpty()) { + plugin.logInfo("Cleaning up lagged proxy " + s + " (" + laggedPlayers.size() + " players)..."); + for (String laggedPlayer : laggedPlayers) { + GenericPlayerUtils.cleanUpPlayer(laggedPlayer, jedisCluster, true); + } + } + } + + Set absentLocally = new HashSet<>(playersInRedis); + absentLocally.removeAll(players); + Set absentInRedis = new HashSet<>(players); + absentInRedis.removeAll(playersInRedis); + + for (String member : absentLocally) { + boolean found = false; + for (String proxyId : plugin.getProxiesIds()) { + if (proxyId.equals(plugin.getConfiguration().getProxyId())) continue; + if (jedisCluster.sismember("proxy:" + proxyId + ":usersOnline", member)) { + // Just clean up the set. + found = true; + break; + } + } + if (!found) { + GenericPlayerUtils.cleanUpPlayer(member, jedisCluster, false); + plugin.logWarn("Player found in set that was not found locally and globally: " + member); + } else { + jedisCluster.srem("proxy:" + plugin.getConfiguration().getProxyId() + ":usersOnline", member); + plugin.logWarn("Player found in set that was not found locally, but is on another proxy: " + member); + } + } + // due JedisCluster does not support pipelined. + //Pipeline pipeline = jedis.pipelined(); + + for (String player : absentInRedis) { + // Player not online according to Redis but not BungeeCord. + handlePlatformPlayer(player, jedisCluster); + } + } catch (Throwable e) { + plugin.logFatal("Unable to fix up stored player data"); + e.printStackTrace(); + } + return null; + } + + + public abstract void handlePlatformPlayer(String player, JedisCluster jedisCluster); + + public abstract void handlePlatformPlayer(String player, Pipeline pipeline); +} diff --git a/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/ShutdownUtils.java b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/ShutdownUtils.java new file mode 100644 index 0000000..53d0af2 --- /dev/null +++ b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/tasks/ShutdownUtils.java @@ -0,0 +1,39 @@ +package com.imaginarycode.minecraft.redisbungee.api.tasks; + +import com.imaginarycode.minecraft.redisbungee.api.GenericPlayerUtils; +import com.imaginarycode.minecraft.redisbungee.api.RedisBungeePlugin; +import redis.clients.jedis.Jedis; +import redis.clients.jedis.JedisCluster; + +import java.util.Set; + +public class ShutdownUtils { + + public static void shutdownCleanup(RedisBungeePlugin plugin) { + new RedisTask(plugin) { + @Override + public Void jedisTask(Jedis jedis) { + jedis.hdel("heartbeats", plugin.getConfiguration().getProxyId()); + if (jedis.scard("proxy:" + plugin.getConfiguration().getProxyId() + ":usersOnline") > 0) { + Set players = jedis.smembers("proxy:" + plugin.getConfiguration().getProxyId() + ":usersOnline"); + for (String member : players) + GenericPlayerUtils.cleanUpPlayer(member, jedis, true); + } + return null; + } + + @Override + public Void clusterJedisTask(JedisCluster jedisCluster) { + jedisCluster.hdel("heartbeats", plugin.getConfiguration().getProxyId()); + if (jedisCluster.scard("proxy:" + plugin.getConfiguration().getProxyId() + ":usersOnline") > 0) { + Set players = jedisCluster.smembers("proxy:" + plugin.getConfiguration().getProxyId() + ":usersOnline"); + for (String member : players) + GenericPlayerUtils.cleanUpPlayer(member, jedisCluster, true); + } + return null; + } + }.execute(); + } + + +} diff --git a/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/util/RedisUtil.java b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/util/RedisUtil.java index c2c3c21..a881a76 100644 --- a/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/util/RedisUtil.java +++ b/RedisBungee-API/src/main/java/com/imaginarycode/minecraft/redisbungee/api/util/RedisUtil.java @@ -4,6 +4,7 @@ import com.google.common.annotations.VisibleForTesting; @VisibleForTesting public class RedisUtil { + public static int PROXY_TIMEOUT = 30; public static boolean isRedisVersionRight(String redisVersion) { String[] args = redisVersion.split("\\."); if (args.length < 2) { diff --git a/RedisBungee-API/src/main/resources/lua/get_cluster_info.lua b/RedisBungee-API/src/main/resources/lua/get_info.lua similarity index 100% rename from RedisBungee-API/src/main/resources/lua/get_cluster_info.lua rename to RedisBungee-API/src/main/resources/lua/get_info.lua diff --git a/RedisBungee-API/src/main/resources/lua/get_cluster_time.lua b/RedisBungee-API/src/main/resources/lua/get_time.lua similarity index 100% rename from RedisBungee-API/src/main/resources/lua/get_cluster_time.lua rename to RedisBungee-API/src/main/resources/lua/get_time.lua diff --git a/RedisBungee-Bungee/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeBungeePlugin.java b/RedisBungee-Bungee/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeBungeePlugin.java index 8ede890..1645d69 100644 --- a/RedisBungee-Bungee/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeBungeePlugin.java +++ b/RedisBungee-Bungee/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeBungeePlugin.java @@ -2,15 +2,10 @@ package com.imaginarycode.minecraft.redisbungee; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import com.imaginarycode.minecraft.redisbungee.api.config.RedisBungeeConfiguration; -import com.imaginarycode.minecraft.redisbungee.api.tasks.HeartbeatTask; -import com.imaginarycode.minecraft.redisbungee.api.tasks.RedisTask; -import com.imaginarycode.minecraft.redisbungee.api.util.RedisUtil; -import com.imaginarycode.minecraft.redisbungee.api.util.payload.PayloadUtils; +import com.imaginarycode.minecraft.redisbungee.api.tasks.*; import com.imaginarycode.minecraft.redisbungee.commands.RedisBungeeCommands; import com.imaginarycode.minecraft.redisbungee.events.PlayerChangedServerNetworkEvent; import com.imaginarycode.minecraft.redisbungee.events.PlayerJoinedNetworkEvent; @@ -18,7 +13,6 @@ import com.imaginarycode.minecraft.redisbungee.events.PlayerLeftNetworkEvent; import com.imaginarycode.minecraft.redisbungee.events.PubSubMessageEvent; import com.imaginarycode.minecraft.redisbungee.api.*; import com.imaginarycode.minecraft.redisbungee.api.summoners.Summoner; -import com.imaginarycode.minecraft.redisbungee.api.util.io.IOUtil; import com.imaginarycode.minecraft.redisbungee.api.util.lua.LuaManager; import com.imaginarycode.minecraft.redisbungee.api.RedisBungeeMode; import com.imaginarycode.minecraft.redisbungee.api.util.uuid.NameFetcher; @@ -31,8 +25,6 @@ import net.md_5.bungee.api.connection.ProxiedPlayer; import net.md_5.bungee.api.plugin.Event; import net.md_5.bungee.api.plugin.Plugin; import redis.clients.jedis.*; -import redis.clients.jedis.exceptions.JedisConnectionException; -import redis.clients.jedis.exceptions.JedisException; import java.io.*; import java.lang.reflect.Field; @@ -55,7 +47,6 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin private BungeeDataManager dataManager; private OkHttpClient httpClient; private volatile List proxiesIds; - private final AtomicInteger nagAboutServers = new AtomicInteger(); private final AtomicInteger globalPlayerCount = new AtomicInteger(); private Future integrityCheck; private Future heartbeatTask; @@ -76,46 +67,6 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin return this.globalPlayerCount.get(); } - @Override - public int getCurrentCount() { - return new RedisTask(api) { - @Override - public Long jedisTask(Jedis jedis) { - long total = 0; - long redisTime = getRedisTime(jedis.time()); - Map heartBeats = jedis.hgetAll("heartbeats"); - for (Map.Entry stringStringEntry : heartBeats.entrySet()) { - String k = stringStringEntry.getKey(); - String v = stringStringEntry.getValue(); - - long heartbeatTime = Long.parseLong(v); - if (heartbeatTime + 30 >= redisTime) { - total = total + jedis.scard("proxy:" + k + ":usersOnline"); - } - } - return total; - } - - @Override - public Long clusterJedisTask(JedisCluster jedisCluster) { - long total = 0; - long redisTime = getRedisClusterTime(); - Map heartBeats = jedisCluster.hgetAll("heartbeats"); - for (Map.Entry stringStringEntry : heartBeats.entrySet()) { - String k = stringStringEntry.getKey(); - String v = stringStringEntry.getValue(); - - long heartbeatTime = Long.parseLong(v); - if (heartbeatTime + 30 >= redisTime) { - total = total + jedisCluster.scard("proxy:" + k + ":usersOnline"); - } - } - return total; - } - }.execute().intValue(); - - } - @Override public Set getLocalPlayersAsUuidStrings() { ImmutableSet.Builder builder = ImmutableSet.builder(); @@ -130,64 +81,6 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin return this.dataManager; } - @Override - public Set getPlayers() { - return new RedisTask>(api) { - @Override - public Set jedisTask(Jedis jedis) { - ImmutableSet.Builder setBuilder = ImmutableSet.builder(); - try { - List keys = new ArrayList<>(); - for (String i : getProxiesIds()) { - keys.add("proxy:" + i + ":usersOnline"); - } - if (!keys.isEmpty()) { - Set users = jedis.sunion(keys.toArray(new String[0])); - if (users != null && !users.isEmpty()) { - for (String user : users) { - try { - setBuilder = setBuilder.add(UUID.fromString(user)); - } catch (IllegalArgumentException ignored) { - } - } - } - } - } catch (JedisConnectionException e) { - // Redis server has disappeared! - getLogger().log(Level.SEVERE, "Unable to get connection from pool - did your Redis server go away?", e); - throw new RuntimeException("Unable to get all players online", e); - } - return setBuilder.build(); - } - - @Override - public Set clusterJedisTask(JedisCluster jedisCluster) { - ImmutableSet.Builder setBuilder = ImmutableSet.builder(); - try { - List keys = new ArrayList<>(); - for (String i : getProxiesIds()) { - keys.add("proxy:" + i + ":usersOnline"); - } - if (!keys.isEmpty()) { - Set users = jedisCluster.sunion(keys.toArray(new String[0])); - if (users != null && !users.isEmpty()) { - for (String user : users) { - try { - setBuilder = setBuilder.add(UUID.fromString(user)); - } catch (IllegalArgumentException ignored) { - } - } - } - } - } catch (JedisConnectionException e) { - // Redis server has disappeared! - getLogger().log(Level.SEVERE, "Unable to get connection from pool - did your Redis server go away?", e); - throw new RuntimeException("Unable to get all players online", e); - } - return setBuilder.build(); - } - }.execute(); - } @Override public RedisBungeeAPI getApi() { @@ -200,186 +93,24 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin } @Override - public Multimap serversToPlayers() { + public Multimap serverToPlayersCache() { try { - return serverToPlayersCache.get(SERVER_TO_PLAYERS_KEY, new RedisTask>(api) { - @Override - public Multimap jedisTask(Jedis jedis) { - ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); - for (String serverId : getProxiesIds()) { - Set players = jedis.smembers("proxy:" + serverId + ":usersOnline"); - for (String player : players) { - String playerServer = jedis.hget("player:" + player, "server"); - if (playerServer == null) { - continue; - } - builder.put(playerServer, UUID.fromString(player)); - } - } - return builder.build(); - } - - @Override - public Multimap clusterJedisTask(JedisCluster jedisCluster) { - ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); - for (String serverId : getProxiesIds()) { - Set players = jedisCluster.smembers("proxy:" + serverId + ":usersOnline"); - for (String player : players) { - String playerServer = jedisCluster.hget("player:" + player, "server"); - if (playerServer == null) { - continue; - } - builder.put(playerServer, UUID.fromString(player)); - } - } - return builder.build(); - } - }); + return this.serverToPlayersCache.get(SERVER_TO_PLAYERS_KEY, this::serversToPlayers); } catch (ExecutionException e) { throw new RuntimeException(e); } } - @Override - public Set getPlayersOnProxy(String proxyId) { - checkArgument(getProxiesIds().contains(proxyId), proxyId + " is not a valid proxy ID"); - return new RedisTask>(api) { - @Override - public Set jedisTask(Jedis jedis) { - Set users = jedis.smembers("proxy:" + proxyId + ":usersOnline"); - ImmutableSet.Builder builder = ImmutableSet.builder(); - for (String user : users) { - builder.add(UUID.fromString(user)); - } - return builder.build(); - } - - @Override - public Set clusterJedisTask(JedisCluster jedisCluster) { - Set users = jedisCluster.smembers("proxy:" + proxyId + ":usersOnline"); - ImmutableSet.Builder builder = ImmutableSet.builder(); - for (String user : users) { - builder.add(UUID.fromString(user)); - } - return builder.build(); - } - }.execute(); - } - - @Override - public void sendProxyCommand(String proxyId, String command) { - checkArgument(getProxiesIds().contains(proxyId) || proxyId.equals("allservers"), "proxyId is invalid"); - sendChannelMessage("redisbungee-" + proxyId, command); - } - @Override public List getProxiesIds() { return proxiesIds; } - @Override - public List getCurrentProxiesIds(boolean nag, boolean lagged) { - return new RedisTask>(api) { - @Override - public List jedisTask(Jedis jedis) { - try { - long time = getRedisTime(jedis.time()); - int nagTime = 0; - if (nag) { - nagTime = nagAboutServers.decrementAndGet(); - if (nagTime <= 0) { - nagAboutServers.set(10); - } - } - ImmutableList.Builder servers = ImmutableList.builder(); - Map heartbeats = jedis.hgetAll("heartbeats"); - for (Map.Entry entry : heartbeats.entrySet()) { - try { - long stamp = Long.parseLong(entry.getValue()); - if (lagged ? time >= stamp + 30 : time <= stamp + 30) - servers.add(entry.getKey()); - else if (nag && nagTime <= 0) { - getLogger().warning(entry.getKey() + " is " + (time - stamp) + " seconds behind! (Time not synchronized or server down?) and was removed from heartbeat."); - jedis.hdel("heartbeats", entry.getKey()); - } - } catch (NumberFormatException ignored) { - } - } - return servers.build(); - } catch (JedisConnectionException e) { - getLogger().log(Level.SEVERE, "Unable to fetch server IDs", e); - return Collections.singletonList(configuration.getProxyId()); - } - } - - @Override - public List clusterJedisTask(JedisCluster jedisCluster) { - try { - long time = getRedisClusterTime(); - int nagTime = 0; - if (nag) { - nagTime = nagAboutServers.decrementAndGet(); - if (nagTime <= 0) { - nagAboutServers.set(10); - } - } - ImmutableList.Builder servers = ImmutableList.builder(); - Map heartbeats = jedisCluster.hgetAll("heartbeats"); - for (Map.Entry entry : heartbeats.entrySet()) { - try { - long stamp = Long.parseLong(entry.getValue()); - if (lagged ? time >= stamp + 30 : time <= stamp + 30) - servers.add(entry.getKey()); - else if (nag && nagTime <= 0) { - getLogger().warning(entry.getKey() + " is " + (time - stamp) + " seconds behind! (Time not synchronized or server down?) and was removed from heartbeat."); - jedisCluster.hdel("heartbeats", entry.getKey()); - } - } catch (NumberFormatException ignored) { - } - } - return servers.build(); - } catch (JedisConnectionException e) { - getLogger().log(Level.SEVERE, "Unable to fetch server IDs", e); - return Collections.singletonList(configuration.getProxyId()); - } - } - }.execute(); - } - @Override public PubSubListener getPubSubListener() { return this.psl; } - @Override - public void sendChannelMessage(String channel, String message) { - new RedisTask(api) { - @Override - public Void jedisTask(Jedis jedis) { - try { - jedis.publish(channel, message); - } catch (JedisConnectionException e) { - // Redis server has disappeared! - getLogger().log(Level.SEVERE, "Unable to get connection from pool - did your Redis server go away?", e); - throw new RuntimeException("Unable to publish channel message", e); - } - return null; - } - - @Override - public Void clusterJedisTask(JedisCluster jedisCluster) { - try { - jedisCluster.publish(channel, message); - } catch (JedisConnectionException e) { - // Redis server has disappeared! - getLogger().log(Level.SEVERE, "Unable to get connection from pool - did your Redis server go away?", e); - throw new RuntimeException("Unable to publish channel message", e); - } - return null; - } - }.execute(); - } - @Override public void executeAsync(Runnable runnable) { this.getProxy().getScheduler().runAsync(this, runnable); @@ -450,17 +181,6 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin return player.getAddress().getAddress(); } - @Override - public void sendProxyCommand(String cmd) { - checkArgument(getProxiesIds().contains(this.configuration.getProxyId()) || this.configuration.getProxyId().equals("allservers"), "proxyId is invalid"); - sendChannelMessage("redisbungee-" + this.configuration.getProxyId(), cmd); - } - - @Override - public long getRedisTime(List timeRes) { - return Long.parseLong(timeRes.get(0)); - } - @Override public void initialize() { @@ -491,245 +211,40 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin UUIDFetcher.setHttpClient(httpClient); // init lua manager LuaManager luaManager = new LuaManager(this); - new RedisTask(this) { - @Override - public Void jedisTask(Jedis jedis) { - // This is more portable than INFO
- String info = jedis.info(); - for (String s : info.split("\r\n")) { - if (s.startsWith("redis_version:")) { - String version = s.split(":")[1]; - getLogger().info(version + " <- redis version"); - if (!RedisUtil.isRedisVersionRight(version)) { - getLogger().severe("Your version of Redis (" + version + ") is not at least version 3.0 RedisBungee requires a newer version of Redis."); - throw new RuntimeException("Unsupported Redis version detected"); - } - long uuidCacheSize = jedis.hlen("uuid-cache"); - if (uuidCacheSize > 750000) { - getLogger().info("Looks like you have a really big UUID cache! Run https://www.spigotmc.org/resources/redisbungeecleaner.8505/ as soon as possible."); - } - break; - } - } - return null; - } - - @Override - public Void clusterJedisTask(JedisCluster jedisCluster) { - // This is more portable than INFO
- try { - getRedisClusterTimeScript = luaManager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/get_cluster_time.lua"))); - } catch (JedisException e) { - throw new RuntimeException("possible not supported redis version", e); - } - String info = (String) luaManager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/get_cluster_info.lua"))).eval(Collections.singletonList("0"), Collections.emptyList()); - for (String s : info.split("\r\n")) { - if (s.startsWith("redis_version:")) { - String version = s.split(":")[1]; - getLogger().info(version + " <- redis version"); - if (!RedisUtil.isRedisVersionRight(version)) { - getLogger().severe("Your version of Redis (" + version + ") is not at least version 3.0 RedisBungee requires a newer version of Redis."); - throw new RuntimeException("Unsupported Redis version detected"); - } - long uuidCacheSize = jedisCluster.hlen("uuid-cache"); - if (uuidCacheSize > 750000) { - getLogger().info("Looks like you have a really big UUID cache! Run https://www.spigotmc.org/resources/redisbungeecleaner.8505/ as soon as possible."); - } - break; - } - } - return null; - } - }.execute(); + this.getRedisClusterTimeScript = InitialUtils.getTimeScript(this, luaManager); getLogger().info("lua manager was loaded"); // check if this proxy is recovering from a crash and start heart the beat. - new RedisTask(api) { - @Override - public Void jedisTask(Jedis jedis) { - File crashFile = new File(getDataFolder(), "restarted_from_crash.txt"); - if (crashFile.exists() && crashFile.delete()) { - getLogger().info("crash file was deleted"); - } else if (jedis.hexists("heartbeats", configuration.getProxyId())) { - try { - long value = Long.parseLong(jedis.hget("heartbeats", configuration.getProxyId())); - long redisTime = getRedisTime(jedis.time()); - if (redisTime < value + 20) { - getLogger().severe("You have launched a possible impostor Velocity / Bungeecord instance. Another instance is already running."); - getLogger().severe("For data consistency reasons, RedisBungee will now disable itself."); - getLogger().severe("If this instance is coming up from a crash, create a file in your RedisBungee plugins directory with the name 'restarted_from_crash.txt' and RedisBungee will not perform this check."); - throw new RuntimeException("Possible impostor instance!"); - } - } catch (NumberFormatException ignored) { - } - } - - return null; - } - - @Override - public Void clusterJedisTask(JedisCluster jedisCluster) { - File crashFile = new File(getDataFolder(), "restarted_from_crash.txt"); - if (crashFile.exists() && crashFile.delete()) { - getLogger().info("crash file was deleted"); - } else if (jedisCluster.hexists("heartbeats", configuration.getProxyId())) { - try { - long value = Long.parseLong(jedisCluster.hget("heartbeats", configuration.getProxyId())); - long redisTime = getRedisClusterTime(); - - if (redisTime < value + 20) { - getLogger().severe("You have launched a possible impostor Velocity / Bungeecord instance. Another instance is already running."); - getLogger().severe("For data consistency reasons, RedisBungee will now disable itself."); - getLogger().severe("If this instance is coming up from a crash, create a file in your RedisBungee plugins directory with the name 'restarted_from_crash.txt' and RedisBungee will not perform this check."); - throw new RuntimeException("Possible impostor instance!"); - } - } catch (NumberFormatException ignored) { - } - } - return null; - } - }.execute(); - + InitialUtils.checkIfRecovering(this, getDataFolder().toPath()); updateProxyIds(); - uuidTranslator = new UUIDTranslator(this); - - heartbeatTask = service.scheduleAtFixedRate(new HeartbeatTask(this, this.globalPlayerCount), 0, HeartbeatTask.INTERVAL, HeartbeatTask.REPEAT_INTERVAL_TIME_UNIT); - dataManager = new BungeeDataManager(this); getProxy().getPluginManager().registerListener(this, new RedisBungeeBungeeListener(this, configuration.getExemptAddresses())); getProxy().getPluginManager().registerListener(this, dataManager); psl = new PubSubListener(this); getProxy().getScheduler().runAsync(this, psl); - RedisTask integrityCheckRedisTask = new RedisTask(api) { + IntegrityCheckTask integrityCheckTask = new IntegrityCheckTask(this) { @Override - public Void jedisTask(Jedis jedis) { - try { - Set players = getLocalPlayersAsUuidStrings(); - Set playersInRedis = jedis.smembers("proxy:" + configuration.getProxyId() + ":usersOnline"); - List lagged = getCurrentProxiesIds(false, true); + public void handlePlatformPlayer(String player, JedisCluster jedis) { + ProxiedPlayer proxiedPlayer = ProxyServer.getInstance().getPlayer(UUID.fromString(player)); + if (proxiedPlayer == null) + return; // We'll deal with it later. - // Clean up lagged players. - for (String s : lagged) { - Set laggedPlayers = jedis.smembers("proxy:" + s + ":usersOnline"); - jedis.del("proxy:" + s + ":usersOnline"); - if (!laggedPlayers.isEmpty()) { - getLogger().info("Cleaning up lagged proxy " + s + " (" + laggedPlayers.size() + " players)..."); - for (String laggedPlayer : laggedPlayers) { - GenericPlayerUtils.cleanUpPlayer(laggedPlayer, jedis, true); - } - } - } - - Set absentLocally = new HashSet<>(playersInRedis); - absentLocally.removeAll(players); - Set absentInRedis = new HashSet<>(players); - absentInRedis.removeAll(playersInRedis); - - for (String member : absentLocally) { - boolean found = false; - for (String proxyId : getProxiesIds()) { - if (proxyId.equals(configuration.getProxyId())) continue; - if (jedis.sismember("proxy:" + proxyId + ":usersOnline", member)) { - // Just clean up the set. - found = true; - break; - } - } - if (!found) { - GenericPlayerUtils.cleanUpPlayer(member, jedis, false); - getLogger().warning("Player found in set that was not found locally and globally: " + member); - } else { - jedis.srem("proxy:" + configuration.getProxyId() + ":usersOnline", member); - getLogger().warning("Player found in set that was not found locally, but is on another proxy: " + member); - } - } - - Pipeline pipeline = jedis.pipelined(); - - for (String player : absentInRedis) { - // Player not online according to Redis but not BungeeCord. - getLogger().warning("Player " + player + " is on the proxy but not in Redis."); - - ProxiedPlayer proxiedPlayer = ProxyServer.getInstance().getPlayer(UUID.fromString(player)); - if (proxiedPlayer == null) - continue; // We'll deal with it later. - - BungeePlayerUtils.createPlayer(proxiedPlayer, pipeline, false); - } - - pipeline.sync(); - } catch (Throwable e) { - getLogger().log(Level.SEVERE, "Unable to fix up stored player data", e); - } - return null; + BungeePlayerUtils.createPlayer(proxiedPlayer, jedis, false); } @Override - public Void clusterJedisTask(JedisCluster jedisCluster) { - try { - Set players = getLocalPlayersAsUuidStrings(); - Set playersInRedis = jedisCluster.smembers("proxy:" + configuration.getProxyId() + ":usersOnline"); - List lagged = getCurrentProxiesIds(false, true); + public void handlePlatformPlayer(String player, Pipeline pipeline) { + ProxiedPlayer proxiedPlayer = ProxyServer.getInstance().getPlayer(UUID.fromString(player)); + if (proxiedPlayer == null) + return; // We'll deal with it later. - // Clean up lagged players. - for (String s : lagged) { - Set laggedPlayers = jedisCluster.smembers("proxy:" + s + ":usersOnline"); - jedisCluster.del("proxy:" + s + ":usersOnline"); - if (!laggedPlayers.isEmpty()) { - getLogger().info("Cleaning up lagged proxy " + s + " (" + laggedPlayers.size() + " players)..."); - for (String laggedPlayer : laggedPlayers) { - GenericPlayerUtils.cleanUpPlayer(laggedPlayer, jedisCluster, true); - } - } - } - - Set absentLocally = new HashSet<>(playersInRedis); - absentLocally.removeAll(players); - Set absentInRedis = new HashSet<>(players); - absentInRedis.removeAll(playersInRedis); - - for (String member : absentLocally) { - boolean found = false; - for (String proxyId : getProxiesIds()) { - if (proxyId.equals(configuration.getProxyId())) continue; - if (jedisCluster.sismember("proxy:" + proxyId + ":usersOnline", member)) { - // Just clean up the set. - found = true; - break; - } - } - if (!found) { - GenericPlayerUtils.cleanUpPlayer(member, jedisCluster, false); - getLogger().warning("Player found in set that was not found locally and globally: " + member); - } else { - jedisCluster.srem("proxy:" + configuration.getProxyId() + ":usersOnline", member); - getLogger().warning("Player found in set that was not found locally, but is on another proxy: " + member); - } - } - // due JedisCluster does not support pipelined. - //Pipeline pipeline = jedis.pipelined(); - - for (String player : absentInRedis) { - // Player not online according to Redis but not BungeeCord. - getLogger().warning("Player " + player + " is on the proxy but not in Redis."); - - ProxiedPlayer proxiedPlayer = ProxyServer.getInstance().getPlayer(UUID.fromString(player)); - if (proxiedPlayer == null) - continue; // We'll deal with it later. - - BungeePlayerUtils.createPlayer(proxiedPlayer, jedisCluster, true); - } - - } catch (Throwable e) { - getLogger().log(Level.SEVERE, "Unable to fix up stored player data", e); - } - return null; + BungeePlayerUtils.createPlayer(proxiedPlayer, pipeline, false); } }; - integrityCheck = service.scheduleAtFixedRate(integrityCheckRedisTask::execute, 0, 30, TimeUnit.SECONDS); + integrityCheck = service.scheduleAtFixedRate(integrityCheckTask::execute, 0, IntegrityCheckTask.INTERVAL, IntegrityCheckTask.TIMEUNIT); // register plugin messages channel. getProxy().registerChannel("legacy:redisbungee"); @@ -762,32 +277,8 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin if (heartbeatTask != null) { heartbeatTask.cancel(true); } - getProxy().getPluginManager().unregisterListeners(this); - - new RedisTask(api) { - @Override - public Void jedisTask(Jedis jedis) { - jedis.hdel("heartbeats", configuration.getProxyId()); - if (jedis.scard("proxy:" + configuration.getProxyId() + ":usersOnline") > 0) { - Set players = jedis.smembers("proxy:" + configuration.getProxyId() + ":usersOnline"); - for (String member : players) - GenericPlayerUtils.cleanUpPlayer(member, jedis, true); - } - return null; - } - - @Override - public Void clusterJedisTask(JedisCluster jedisCluster) { - jedisCluster.hdel("heartbeats", configuration.getProxyId()); - if (jedisCluster.scard("proxy:" + configuration.getProxyId() + ":usersOnline") > 0) { - Set players = jedisCluster.smembers("proxy:" + configuration.getProxyId() + ":usersOnline"); - for (String member : players) - GenericPlayerUtils.cleanUpPlayer(member, jedisCluster, true); - } - return null; - } - }.execute(); + ShutdownUtils.shutdownCleanup(this); try { this.summoner.close(); } catch (IOException e) { @@ -801,47 +292,19 @@ public class RedisBungeeBungeePlugin extends Plugin implements RedisBungeePlugin return this.summoner; } - - @Override - public void kickPlayer(UUID playerUniqueId, String message) { - // first handle on origin proxy if player not found publish the payload - if (!dataManager.handleKick(playerUniqueId, message)) { - new RedisTask(api) { - @Override - public Void jedisTask(Jedis jedis) { - PayloadUtils.kickPlayerPayload(playerUniqueId, message, jedis); - return null; - } - - @Override - public Void clusterJedisTask(JedisCluster jedisCluster) { - PayloadUtils.kickPlayerPayload(playerUniqueId, message, jedisCluster); - return null; - } - }.execute(); - } - } - - @Override - public void kickPlayer(String playerName, String message) { - // fetch the uuid - UUID playerUUID = this.uuidTranslator.getTranslatedUuid(playerName, true); - kickPlayer(playerUUID, message); - } - @Override public RedisBungeeMode getRedisBungeeMode() { return this.redisBungeeMode; } @Override - public Long getRedisClusterTime() { + public Long getRedisTime() { return getRedisTime((List) this.getRedisClusterTimeScript.eval(Collections.singletonList("0"), Collections.emptyList())); } @Override public void updateProxyIds() { - proxiesIds = getCurrentProxiesIds(true, false); + proxiesIds = getCurrentProxiesIds(false); } @Override diff --git a/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeVelocityPlugin.java b/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeVelocityPlugin.java index d8b4080..38113dc 100644 --- a/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeVelocityPlugin.java +++ b/RedisBungee-Velocity/src/main/java/com/imaginarycode/minecraft/redisbungee/RedisBungeeVelocityPlugin.java @@ -2,19 +2,13 @@ package com.imaginarycode.minecraft.redisbungee; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Multimap; import com.google.inject.Inject; import com.imaginarycode.minecraft.redisbungee.api.*; import com.imaginarycode.minecraft.redisbungee.api.config.RedisBungeeConfiguration; import com.imaginarycode.minecraft.redisbungee.api.summoners.Summoner; -import com.imaginarycode.minecraft.redisbungee.api.tasks.HeartbeatTask; -import com.imaginarycode.minecraft.redisbungee.api.tasks.RedisTask; -import com.imaginarycode.minecraft.redisbungee.api.util.RedisUtil; -import com.imaginarycode.minecraft.redisbungee.api.util.payload.PayloadUtils; -import com.imaginarycode.minecraft.redisbungee.api.util.io.IOUtil; +import com.imaginarycode.minecraft.redisbungee.api.tasks.*; import com.imaginarycode.minecraft.redisbungee.api.util.lua.LuaManager; import com.imaginarycode.minecraft.redisbungee.api.util.uuid.NameFetcher; import com.imaginarycode.minecraft.redisbungee.api.util.uuid.UUIDFetcher; @@ -40,13 +34,10 @@ import com.velocitypowered.api.scheduler.ScheduledTask; import org.slf4j.Logger; import redis.clients.jedis.*; import redis.clients.jedis.exceptions.JedisConnectionException; -import redis.clients.jedis.exceptions.JedisException; -import static com.google.common.base.Preconditions.checkArgument; import java.io.*; import java.net.InetAddress; -import java.nio.file.Files; import java.nio.file.Path; import java.util.*; import java.util.concurrent.*; @@ -66,12 +57,11 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { private final VelocityDataManager dataManager; private final OkHttpClient httpClient; private volatile List proxiesIds; - private final AtomicInteger nagAboutServers = new AtomicInteger(); private final AtomicInteger globalPlayerCount = new AtomicInteger(); private ScheduledTask integrityCheck; private ScheduledTask heartbeatTask; - private LuaManager.Script getRedisClusterTimeScript; + private final LuaManager.Script getRedisTimeScript; private static final Object SERVER_TO_PLAYERS_KEY = new Object(); public static final List IDENTIFIERS = List.of( @@ -99,112 +89,10 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { } this.api = new RedisBungeeAPI(this); LuaManager luaManager = new LuaManager(this); - new RedisTask(this) { - @Override - public Void jedisTask(Jedis jedis) { - // This is more portable than INFO
- String info = jedis.info(); - for (String s : info.split("\r\n")) { - if (s.startsWith("redis_version:")) { - String version = s.split(":")[1]; - getLogger().info("{} <- redis version", version); - if (!RedisUtil.isRedisVersionRight(version)) { - getLogger().error("Your version of Redis ({}) is not at least version 6.0 RedisBungee requires a newer version of Redis.", version); - throw new RuntimeException("Unsupported Redis version detected"); - } - long uuidCacheSize = jedis.hlen("uuid-cache"); - if (uuidCacheSize > 750000) { - getLogger().info("Looks like you have a really big UUID cache! Run https://www.spigotmc.org/resources/redisbungeecleaner.8505/ as soon as possible."); - } - break; - } - } - return null; - } - - @Override - public Void clusterJedisTask(JedisCluster jedisCluster) { - // This is more portable than INFO
- try { - getRedisClusterTimeScript = luaManager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/get_cluster_time.lua"))); - } catch (JedisException e) { - throw new RuntimeException("possible not supported redis version", e); - } - String info = (String) luaManager.createScript(IOUtil.readInputStreamAsString(getResourceAsStream("lua/get_cluster_info.lua"))).eval(Collections.singletonList("0"), Collections.emptyList()); - for (String s : info.split("\r\n")) { - if (s.startsWith("redis_version:")) { - String version = s.split(":")[1]; - getLogger().info("{} <- redis version", version); - if (!RedisUtil.isRedisVersionRight(version)) { - getLogger().error("Your version of Redis ({}) is not at least version 6.0 RedisBungee requires a newer version of Redis.", version); - throw new RuntimeException("Unsupported Redis version detected"); - } - long uuidCacheSize = jedisCluster.hlen("uuid-cache"); - if (uuidCacheSize > 750000) { - getLogger().info("Looks like you have a really big UUID cache! Run https://www.spigotmc.org/resources/redisbungeecleaner.8505/ as soon as possible."); - } - break; - } - } - return null; - } - }.execute(); + this.getRedisTimeScript = InitialUtils.getTimeScript(this, luaManager); getLogger().info("lua manager was loaded"); // check if this proxy is recovering from a crash and start heart the beat. - new RedisTask(api) { - @Override - public Void jedisTask(Jedis jedis) { - Path crashFile = getDataFolder().resolve("restarted_from_crash.txt"); - if (Files.exists(crashFile)) { - try { - Files.delete(crashFile); - } catch (IOException e) { - throw new RuntimeException(e); - } - getLogger().info("crash file was deleted"); - } else if (jedis.hexists("heartbeats", configuration.getProxyId())) { - try { - long value = Long.parseLong(jedis.hget("heartbeats", configuration.getProxyId())); - long redisTime = getRedisTime(jedis.time()); - if (redisTime < value + 20) { - getLogger().error("You have launched a possible impostor Velocity / Bungeecord instance. Another instance is already running."); - getLogger().error("For data consistency reasons, RedisBungee will now disable itself."); - getLogger().error("If this instance is coming up from a crash, create a file in your RedisBungee plugins directory with the name 'restarted_from_crash.txt' and RedisBungee will not perform this check."); - throw new RuntimeException("Possible impostor instance!"); - } - } catch (NumberFormatException ignored) { - } - } - return null; - } - - @Override - public Void clusterJedisTask(JedisCluster jedisCluster) { - Path crashFile = getDataFolder().resolve("restarted_from_crash.txt"); - if (Files.exists(crashFile)) { - try { - Files.delete(crashFile); - } catch (IOException e) { - throw new RuntimeException(e); - } - getLogger().info("crash file was deleted"); - } else if (jedisCluster.hexists("heartbeats", configuration.getProxyId())) { - try { - long value = Long.parseLong(jedisCluster.hget("heartbeats", configuration.getProxyId())); - long redisTime = getRedisClusterTime(); - - if (redisTime < value + 20) { - getLogger().error("You have launched a possible impostor Velocity / Bungeecord instance. Another instance is already running."); - getLogger().error("For data consistency reasons, RedisBungee will now disable itself."); - getLogger().error("If this instance is coming up from a crash, create a file in your RedisBungee plugins directory with the name 'restarted_from_crash.txt' and RedisBungee will not perform this check."); - throw new RuntimeException("Possible impostor instance!"); - } - } catch (NumberFormatException ignored) { - } - } - return null; - } - }.execute(); + InitialUtils.checkIfRecovering(this, getDataFolder()); uuidTranslator = new UUIDTranslator(this); dataManager = new VelocityDataManager(this); psl = new PubSubListener(this); @@ -230,45 +118,6 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { return this.globalPlayerCount.get(); } - @Override - public int getCurrentCount() { - return new RedisTask(api) { - @Override - public Long jedisTask(Jedis jedis) { - long total = 0; - long redisTime = getRedisTime(jedis.time()); - Map heartBeats = jedis.hgetAll("heartbeats"); - for (Map.Entry stringStringEntry : heartBeats.entrySet()) { - String k = stringStringEntry.getKey(); - String v = stringStringEntry.getValue(); - - long heartbeatTime = Long.parseLong(v); - if (heartbeatTime + 30 >= redisTime) { - total = total + jedis.scard("proxy:" + k + ":usersOnline"); - } - } - return total; - } - - @Override - public Long clusterJedisTask(JedisCluster jedisCluster) { - long total = 0; - long redisTime = getRedisClusterTime(); - Map heartBeats = jedisCluster.hgetAll("heartbeats"); - for (Map.Entry stringStringEntry : heartBeats.entrySet()) { - String k = stringStringEntry.getKey(); - String v = stringStringEntry.getValue(); - - long heartbeatTime = Long.parseLong(v); - if (heartbeatTime + 30 >= redisTime) { - total = total + jedisCluster.scard("proxy:" + k + ":usersOnline"); - } - } - return total; - } - }.execute().intValue(); - - } @Override public Set getLocalPlayersAsUuidStrings() { @@ -284,66 +133,6 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { return this.dataManager; } - @Override - public Set getPlayers() { - return new RedisTask>(api) { - @Override - public Set jedisTask(Jedis jedis) { - ImmutableSet.Builder setBuilder = ImmutableSet.builder(); - try { - List keys = new ArrayList<>(); - for (String i : getProxiesIds()) { - keys.add("proxy:" + i + ":usersOnline"); - } - if (!keys.isEmpty()) { - Set users = jedis.sunion(keys.toArray(new String[0])); - if (users != null && !users.isEmpty()) { - for (String user : users) { - try { - setBuilder = setBuilder.add(UUID.fromString(user)); - } catch (IllegalArgumentException ignored) { - } - } - } - } - } catch (JedisConnectionException e) { - // Redis server has disappeared! - getLogger().error("Unable to get connection from pool - did your Redis server go away?", e); - throw new RuntimeException("Unable to get all players online", e); - } - return setBuilder.build(); - } - - @Override - public Set clusterJedisTask(JedisCluster jedisCluster) { - ImmutableSet.Builder setBuilder = ImmutableSet.builder(); - try { - List keys = new ArrayList<>(); - for (String i : getProxiesIds()) { - keys.add("proxy:" + i + ":usersOnline"); - } - if (!keys.isEmpty()) { - Set users = jedisCluster.sunion(keys.toArray(new String[0])); - if (users != null && !users.isEmpty()) { - for (String user : users) { - try { - setBuilder = setBuilder.add(UUID.fromString(user)); - } catch (IllegalArgumentException ignored) { - } - } - } - } - } catch (JedisConnectionException e) { - // Redis server has disappeared! - getLogger().error("Unable to get connection from pool - did your Redis server go away?", e); - throw new RuntimeException("Unable to get all players online", e); - } - return setBuilder.build(); - } - }.execute(); - } - - @Override public Summoner getSummoner() { return this.jedisSummoner; @@ -360,187 +149,24 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { } @Override - public Multimap serversToPlayers() { + public Multimap serverToPlayersCache() { try { - return serverToPlayersCache.get(SERVER_TO_PLAYERS_KEY, new RedisTask>(api) { - @Override - public Multimap jedisTask(Jedis jedis) { - ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); - for (String serverId : getProxiesIds()) { - Set players = jedis.smembers("proxy:" + serverId + ":usersOnline"); - for (String player : players) { - String playerServer = jedis.hget("player:" + player, "server"); - if (playerServer == null) { - continue; - } - builder.put(playerServer, UUID.fromString(player)); - } - } - return builder.build(); - } - - @Override - public Multimap clusterJedisTask(JedisCluster jedisCluster) { - ImmutableMultimap.Builder builder = ImmutableMultimap.builder(); - for (String serverId : getProxiesIds()) { - Set players = jedisCluster.smembers("proxy:" + serverId + ":usersOnline"); - for (String player : players) { - String playerServer = jedisCluster.hget("player:" + player, "server"); - if (playerServer == null) { - continue; - } - builder.put(playerServer, UUID.fromString(player)); - } - } - return builder.build(); - } - }); + return this.serverToPlayersCache.get(SERVER_TO_PLAYERS_KEY, this::serversToPlayers); } catch (ExecutionException e) { throw new RuntimeException(e); } } - @Override - public Set getPlayersOnProxy(String proxyId) { - checkArgument(getProxiesIds().contains(proxyId), proxyId + " is not a valid proxy ID"); - return new RedisTask>(api) { - @Override - public Set jedisTask(Jedis jedis) { - Set users = jedis.smembers("proxy:" + proxyId + ":usersOnline"); - ImmutableSet.Builder builder = ImmutableSet.builder(); - for (String user : users) { - builder.add(UUID.fromString(user)); - } - return builder.build(); - } - - @Override - public Set clusterJedisTask(JedisCluster jedisCluster) { - Set users = jedisCluster.smembers("proxy:" + proxyId + ":usersOnline"); - ImmutableSet.Builder builder = ImmutableSet.builder(); - for (String user : users) { - builder.add(UUID.fromString(user)); - } - return builder.build(); - } - }.execute(); - } - - @Override - public void sendProxyCommand(String serverId, String command) { - checkArgument(getProxiesIds().contains(serverId) || serverId.equals("allservers"), "proxyId is invalid"); - sendChannelMessage("redisbungee-" + serverId, command); - } - @Override public List getProxiesIds() { return proxiesIds; } - - @Override - public List getCurrentProxiesIds(boolean nag, boolean lagged) { - return new RedisTask>(api) { - @Override - public List jedisTask(Jedis jedis) { - try { - long time = getRedisTime(jedis.time()); - int nagTime = 0; - if (nag) { - nagTime = nagAboutServers.decrementAndGet(); - if (nagTime <= 0) { - nagAboutServers.set(10); - } - } - ImmutableList.Builder servers = ImmutableList.builder(); - Map heartbeats = jedis.hgetAll("heartbeats"); - for (Map.Entry entry : heartbeats.entrySet()) { - try { - long stamp = Long.parseLong(entry.getValue()); - if (lagged ? time >= stamp + 30 : time <= stamp + 30) - servers.add(entry.getKey()); - else if (nag && nagTime <= 0) { - getLogger().warn("{} is {} seconds behind! (Time not synchronized or server down?) and was removed from heartbeat.", entry.getKey(), (time - stamp)); - jedis.hdel("heartbeats", entry.getKey()); - } - } catch (NumberFormatException ignored) { - } - } - return servers.build(); - } catch (JedisConnectionException e) { - getLogger().error("Unable to fetch server IDs", e); - return Collections.singletonList(configuration.getProxyId()); - } - } - - @Override - public List clusterJedisTask(JedisCluster jedisCluster) { - try { - long time = getRedisClusterTime(); - int nagTime = 0; - if (nag) { - nagTime = nagAboutServers.decrementAndGet(); - if (nagTime <= 0) { - nagAboutServers.set(10); - } - } - ImmutableList.Builder servers = ImmutableList.builder(); - Map heartbeats = jedisCluster.hgetAll("heartbeats"); - for (Map.Entry entry : heartbeats.entrySet()) { - try { - long stamp = Long.parseLong(entry.getValue()); - if (lagged ? time >= stamp + 30 : time <= stamp + 30) - servers.add(entry.getKey()); - else if (nag && nagTime <= 0) { - getLogger().warn("{} is {} seconds behind! (Time not synchronized or server down?) and was removed from heartbeat.", entry.getKey(), (time - stamp)); - jedisCluster.hdel("heartbeats", entry.getKey()); - } - } catch (NumberFormatException ignored) { - } - } - return servers.build(); - } catch (JedisConnectionException e) { - getLogger().error("Unable to fetch server IDs", e); - return Collections.singletonList(configuration.getProxyId()); - } - } - }.execute(); - } - @Override public PubSubListener getPubSubListener() { return this.psl; } - @Override - public void sendChannelMessage(String channel, String message) { - new RedisTask(api) { - @Override - public Void jedisTask(Jedis jedis) { - try { - jedis.publish(channel, message); - } catch (JedisConnectionException e) { - // Redis server has disappeared! - getLogger().error("Unable to get connection from pool - did your Redis server go away?", e); - throw new RuntimeException("Unable to publish channel message", e); - } - return null; - } - - @Override - public Void clusterJedisTask(JedisCluster jedisCluster) { - try { - jedisCluster.publish(channel, message); - } catch (JedisConnectionException e) { - // Redis server has disappeared! - getLogger().error("Unable to get connection from pool - did your Redis server go away?", e); - throw new RuntimeException("Unable to publish channel message", e); - } - return null; - } - }.execute(); - } - @Override public void executeAsync(Runnable runnable) { this.getProxy().getScheduler().buildTask(this, runnable).schedule(); @@ -611,18 +237,6 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { return player.getRemoteAddress().getAddress(); } - @Override - public void sendProxyCommand(String cmd) { - checkArgument(getProxiesIds().contains(this.configuration.getProxyId()) || this.configuration.getProxyId().equals("allservers"), "proxyId is invalid"); - sendChannelMessage("redisbungee-" + this.configuration.getProxyId(), cmd); - } - - @Override - public long getRedisTime(List timeRes) { - return Long.parseLong(timeRes.get(0)); - } - - @Override public void initialize() { updateProxyIds(); @@ -632,130 +246,26 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { getProxy().getEventManager().register(this, new RedisBungeeVelocityListener(this, configuration.getExemptAddresses())); getProxy().getEventManager().register(this, dataManager); getProxy().getScheduler().buildTask(this, psl).schedule(); - RedisTask integrityCheckRedisTask = new RedisTask(api) { + + IntegrityCheckTask integrityCheckTask = new IntegrityCheckTask(this) { @Override - public Void jedisTask(Jedis jedis) { - try { - Set players = getLocalPlayersAsUuidStrings(); - Set playersInRedis = jedis.smembers("proxy:" + configuration.getProxyId() + ":usersOnline"); - List lagged = getCurrentProxiesIds(false, true); - - // Clean up lagged players. - for (String s : lagged) { - Set laggedPlayers = jedis.smembers("proxy:" + s + ":usersOnline"); - jedis.del("proxy:" + s + ":usersOnline"); - if (!laggedPlayers.isEmpty()) { - getLogger().info("Cleaning up lagged proxy {} ({} players)...", s, laggedPlayers.size()); - for (String laggedPlayer : laggedPlayers) { - GenericPlayerUtils.cleanUpPlayer(laggedPlayer, jedis, true); - } - } - } - - Set absentLocally = new HashSet<>(playersInRedis); - absentLocally.removeAll(players); - Set absentInRedis = new HashSet<>(players); - absentInRedis.removeAll(playersInRedis); - - for (String member : absentLocally) { - boolean found = false; - for (String proxyId : getProxiesIds()) { - if (proxyId.equals(configuration.getProxyId())) continue; - if (jedis.sismember("proxy:" + proxyId + ":usersOnline", member)) { - // Just clean up the set. - found = true; - break; - } - } - if (!found) { - GenericPlayerUtils.cleanUpPlayer(member, jedis, false); - getLogger().warn("Player found in set that was not found locally and globally: {}", member); - } else { - jedis.srem("proxy:" + configuration.getProxyId() + ":usersOnline", member); - getLogger().warn("Player found in set that was not found locally, but is on another proxy: {}", member); - } - } - - Pipeline pipeline = jedis.pipelined(); - - for (String player : absentInRedis) { - // Player not online according to Redis but not BungeeCord. - getLogger().warn("Player {} is on the proxy but not in Redis.", player); - - Player playerProxied = getProxy().getPlayer(UUID.fromString(player)).orElse(null); - if (playerProxied == null) - continue; // We'll deal with it later. - - VelocityPlayerUtils.createPlayer(playerProxied, pipeline, false); - } - - pipeline.sync(); - } catch (Throwable e) { - getLogger().error("Unable to fix up stored player data", e); - } - return null; + public void handlePlatformPlayer(String player, JedisCluster jedisCluster) { + Player playerProxied = getProxy().getPlayer(UUID.fromString(player)).orElse(null); + if (playerProxied == null) + return; // We'll deal with it later. + VelocityPlayerUtils.createPlayer(playerProxied, jedisCluster, false); } @Override - public Void clusterJedisTask(JedisCluster jedisCluster) { - try { - Set players = getLocalPlayersAsUuidStrings(); - Set playersInRedis = jedisCluster.smembers("proxy:" + configuration.getProxyId() + ":usersOnline"); - List lagged = getCurrentProxiesIds(false, true); - - // Clean up lagged players. - for (String s : lagged) { - Set laggedPlayers = jedisCluster.smembers("proxy:" + s + ":usersOnline"); - jedisCluster.del("proxy:" + s + ":usersOnline"); - if (!laggedPlayers.isEmpty()) { - getLogger().info("Cleaning up lagged proxy {} ({} players)...", s, laggedPlayers.size()); - for (String laggedPlayer : laggedPlayers) { - GenericPlayerUtils.cleanUpPlayer(laggedPlayer, jedisCluster, true); - } - } - } - - Set absentLocally = new HashSet<>(playersInRedis); - absentLocally.removeAll(players); - Set absentInRedis = new HashSet<>(players); - absentInRedis.removeAll(playersInRedis); - - for (String member : absentLocally) { - boolean found = false; - for (String proxyId : getProxiesIds()) { - if (proxyId.equals(configuration.getProxyId())) continue; - if (jedisCluster.sismember("proxy:" + proxyId + ":usersOnline", member)) { - // Just clean up the set. - found = true; - break; - } - } - if (!found) { - GenericPlayerUtils.cleanUpPlayer(member, jedisCluster, false); - getLogger().warn("Player found in set that was not found locally and globally: {}", member); - } else { - jedisCluster.srem("proxy:" + configuration.getProxyId() + ":usersOnline", member); - getLogger().warn("Player found in set that was not found locally, but is on another proxy: {}", member); - } - } - - for (String player : absentInRedis) { - // Player not online according to Redis but not BungeeCord. - getLogger().warn("Player {} is on the proxy but not in Redis.", player); - - Player playerProxied = getProxy().getPlayer(UUID.fromString(player)).orElse(null); - if (playerProxied == null) - continue; // We'll deal with it later. - - VelocityPlayerUtils.createPlayer(playerProxied, jedisCluster, false); - } - } catch (Throwable e) { - getLogger().error("Unable to fix up stored player data", e); - } - return null; + public void handlePlatformPlayer(String player, Pipeline pipeline) { + Player playerProxied = getProxy().getPlayer(UUID.fromString(player)).orElse(null); + if (playerProxied == null) + return; // We'll deal with it later. + VelocityPlayerUtils.createPlayer(playerProxied, pipeline, false); } }; - integrityCheck = getProxy().getScheduler().buildTask(this, integrityCheckRedisTask::execute).repeat(30, TimeUnit.SECONDS).schedule(); + integrityCheck = getProxy().getScheduler().buildTask(this, integrityCheckTask::execute).repeat(30, TimeUnit.SECONDS).schedule(); + // register plugin messages IDENTIFIERS.forEach(getProxy().getChannelRegistrar()::register); @@ -789,29 +299,7 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { if (heartbeatTask != null) { heartbeatTask.cancel(); } - new RedisTask(api) { - @Override - public Void jedisTask(Jedis jedis) { - jedis.hdel("heartbeats", configuration.getProxyId()); - if (jedis.scard("proxy:" + configuration.getProxyId() + ":usersOnline") > 0) { - Set players = jedis.smembers("proxy:" + configuration.getProxyId() + ":usersOnline"); - for (String member : players) - GenericPlayerUtils.cleanUpPlayer(member, jedis, true); - } - return null; - } - - @Override - public Void clusterJedisTask(JedisCluster jedisCluster) { - jedisCluster.hdel("heartbeats", configuration.getProxyId()); - if (jedisCluster.scard("proxy:" + configuration.getProxyId() + ":usersOnline") > 0) { - Set players = jedisCluster.smembers("proxy:" + configuration.getProxyId() + ":usersOnline"); - for (String member : players) - GenericPlayerUtils.cleanUpPlayer(member, jedisCluster, true); - } - return null; - } - }.execute(); + ShutdownUtils.shutdownCleanup(this); try { this.jedisSummoner.close(); } catch (IOException e) { @@ -833,32 +321,6 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { this.redisBungeeMode = mode; } - @Override - public void kickPlayer(UUID playerUniqueId, String message) { - // first handle on origin proxy if player not found publish the payload - if (!dataManager.handleKick(playerUniqueId, message)) { - new RedisTask(api) { - @Override - public Void jedisTask(Jedis jedis) { - PayloadUtils.kickPlayerPayload(playerUniqueId, message, jedis); - return null; - } - - @Override - public Void clusterJedisTask(JedisCluster jedisCluster) { - PayloadUtils.kickPlayerPayload(playerUniqueId, message, jedisCluster); - return null; - } - }.execute(); - } - } - - @Override - public void kickPlayer(String playerName, String message) { - // fetch the uuid - UUID playerUUID = this.uuidTranslator.getTranslatedUuid(playerName, true); - kickPlayer(playerUUID, message); - } @Override public RedisBungeeMode getRedisBungeeMode() { @@ -866,13 +328,13 @@ public class RedisBungeeVelocityPlugin implements RedisBungeePlugin { } @Override - public Long getRedisClusterTime() { - return getRedisTime((List) this.getRedisClusterTimeScript.eval(Collections.singletonList("0"), Collections.emptyList())); + public Long getRedisTime() { + return getRedisTime((List) this.getRedisTimeScript.eval(Collections.singletonList("0"), Collections.emptyList())); } @Override public void updateProxyIds() { - this.proxiesIds = this.getCurrentProxiesIds(true, false); + this.proxiesIds = this.getCurrentProxiesIds(false); } @Subscribe