new ConnectionController!

This commit is contained in:
mohammed jasem alaajel 2020-12-23 21:04:23 +04:00
parent 3920b0e844
commit a2b58798f9
6 changed files with 84 additions and 113 deletions

View File

@ -8,7 +8,7 @@ import ch.njol.skript.util.Date;
import ch.njol.skript.util.Getter; import ch.njol.skript.util.Getter;
import net.limework.rediskript.commands.CommandReloadRedis; import net.limework.rediskript.commands.CommandReloadRedis;
import net.limework.rediskript.events.RedisMessageEvent; import net.limework.rediskript.events.RedisMessageEvent;
import net.limework.rediskript.managers.RedisManager; import net.limework.rediskript.managers.RedisController;
import net.limework.rediskript.skript.elements.*; import net.limework.rediskript.skript.elements.*;
import org.bukkit.plugin.java.JavaPlugin; import org.bukkit.plugin.java.JavaPlugin;
@ -16,14 +16,14 @@ import java.io.IOException;
public class RediSkript extends JavaPlugin { public class RediSkript extends JavaPlugin {
//Redis manager private RedisController redisController;
private RedisManager rm;
public void startRedis(boolean reload) { public void reloadRedis(){
if (reload) { reloadConfig(); } redisController.shutdown();
rm = new RedisManager(this); redisController = new RedisController(this);
rm.start();
} }
public void registerSyntax() { public void registerSyntax() {
SkriptAddon addon = Skript.registerAddon(this); SkriptAddon addon = Skript.registerAddon(this);
try { try {
@ -60,16 +60,16 @@ public class RediSkript extends JavaPlugin {
@Override @Override
public void onEnable() { public void onEnable() {
saveDefaultConfig(); saveDefaultConfig();
startRedis(false); redisController = new RedisController(this);
getServer().getPluginCommand("reloadredis").setExecutor(new CommandReloadRedis(this)); getServer().getPluginCommand("reloadredis").setExecutor(new CommandReloadRedis(this));
registerSyntax(); registerSyntax();
} }
@Override @Override
public void onDisable() { public void onDisable() {
if (rm != null) rm.shutdown(); if (redisController != null) redisController.shutdown();
} }
public RedisManager getRedisManager() { public RedisController getRC() {
return rm; return redisController;
} }
} }

View File

@ -27,7 +27,7 @@ public class CommandReloadRedis implements CommandExecutor {
new BukkitRunnable() { new BukkitRunnable() {
@Override @Override
public void run() { public void run() {
plugin.getRedisManager().reload(); plugin.reloadRedis();
} }
}.runTaskAsynchronously(plugin); }.runTaskAsynchronously(plugin);

View File

@ -8,46 +8,43 @@ import net.limework.rediskript.events.RedisMessageEvent;
import org.bukkit.Bukkit; import org.bukkit.Bukkit;
import org.bukkit.ChatColor; import org.bukkit.ChatColor;
import org.bukkit.configuration.Configuration; import org.bukkit.configuration.Configuration;
import org.bukkit.scheduler.BukkitTask;
import org.cryptomator.siv.UnauthenticCiphertextException; import org.cryptomator.siv.UnauthenticCiphertextException;
import org.json.JSONArray; import org.json.JSONArray;
import org.json.JSONObject; import org.json.JSONObject;
import redis.clients.jedis.BinaryJedis; import redis.clients.jedis.*;
import redis.clients.jedis.BinaryJedisPubSub;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.exceptions.JedisConnectionException;
import javax.crypto.IllegalBlockSizeException; import javax.crypto.IllegalBlockSizeException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Base64; import java.util.Base64;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
public class RedisManager extends BinaryJedisPubSub implements Runnable { public class RedisController extends BinaryJedisPubSub implements Runnable {
private ExecutorService RedisReconnector;
private RediSkript plugin;
private JedisPool jedisPool;
private ExecutorService RedisService;
//sub //Jedis Pool to be used by every another class.
private BinaryJedis subscribeJedis; private final JedisPool jedisPool;
private List<String> channels;
private AtomicBoolean isShuttingDown = new AtomicBoolean(false); //this seems useless unless tls is OFF!
private Encryption encryption; private final Encryption encryption;
private byte[][] channelsInByte;
private final AtomicBoolean isConnectionBroken;
private final RediSkript plugin;
private BukkitTask ConnectionTask;
public RedisManager(RediSkript plugin) { public RedisController(RediSkript plugin) {
this.plugin = plugin; this.plugin = plugin;
Configuration config = this.plugin.getConfig(); Configuration config = plugin.getConfig();
JedisPoolConfig JConfig = new JedisPoolConfig(); JedisPoolConfig JConfig = new JedisPoolConfig();
int maxConnections = config.getInt("Redis.MaxConnections"); int maxConnections = config.getInt("Redis.MaxConnections");
if (maxConnections < 2) { maxConnections = 2; } if (maxConnections < 2) {
maxConnections = 2;
}
JConfig.setMaxTotal(maxConnections); JConfig.setMaxTotal(maxConnections);
JConfig.setMaxIdle(maxConnections); JConfig.setMaxIdle(maxConnections);
JConfig.setMinIdle(1); JConfig.setMinIdle(1);
@ -58,50 +55,34 @@ public class RedisManager extends BinaryJedisPubSub implements Runnable {
config.getInt("Redis.TimeOut"), config.getInt("Redis.TimeOut"),
config.getString("Redis.Password"), config.getString("Redis.Password"),
config.getBoolean("Redis.useTLS")); config.getBoolean("Redis.useTLS"));
RedisReconnector = Executors.newSingleThreadExecutor();
RedisService = Executors.newSingleThreadExecutor();
try {
this.subscribeJedis = this.jedisPool.getResource();
} catch (Exception ignored) {}
this.channels = config.getStringList("Channels");
encryption = new Encryption(config); encryption = new Encryption(config);
setupChannels(config);
} isConnectionBroken = new AtomicBoolean(true);
//Start the main task on async thread
public void start() { ConnectionTask = plugin.getServer().getScheduler().runTaskTimerAsynchronously(plugin,this, 0 , 20 * 5 );
this.RedisReconnector.execute(this);
} }
@Override @Override
public void run() { public void run() {
while (!isShuttingDown.get() && plugin.isEnabled()) { if (!isConnectionBroken.get()) {
try { return;
plugin.getLogger().info(ChatColor.translateAlternateColorCodes('&', "&cConnecting to redis..."));
if (!this.subscribeJedis.isConnected()) this.subscribeJedis = this.jedisPool.getResource();
plugin.getLogger().info(ChatColor.translateAlternateColorCodes('&', "&aRedis connected!"));
byte[][] channelsInByte = new byte[channels.size()][1];
for (int x = 0; x < channels.size(); x++) {
channelsInByte[x] = channels.get(x).getBytes(StandardCharsets.UTF_8);
}
this.subscribeJedis.subscribe(this, channelsInByte);
} catch (Exception e) {
if (isShuttingDown.get() || !plugin.isEnabled()) {
return;
}
plugin.getLogger().warning(ChatColor.translateAlternateColorCodes('&', "&cConnection to redis has failed! &cReconnecting..."));
if (this.subscribeJedis != null) {
this.subscribeJedis.close();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} }
System.out.println("connecting to redis!");
try (Jedis jedis = jedisPool.getResource()) {
isConnectionBroken.set(false);
System.out.println("connected to redis!");
jedis.subscribe(this, channelsInByte);
} catch (Exception e) {
isConnectionBroken.set(true);
e.printStackTrace();
}
}
public void shutdown() {
ConnectionTask.cancel();
this.unsubscribe();
jedisPool.close();
} }
@Override @Override
@ -170,7 +151,7 @@ public class RedisManager extends BinaryJedisPubSub implements Runnable {
//I will add this once someone tells me how to remove from Skript variable //I will add this once someone tells me how to remove from Skript variable
//because using SET operation has issues with inconvertible types (Double and Long) //because using SET operation has issues with inconvertible types (Double and Long)
//variable = (Variable) Variables.getVariable(variableNames.get(i).toString(), null, false); //variable = (Variable) Variables.getVariable(variableNames.get(i).toString(), null, false);
// variable.change(null, (Object[]) inputValue, Changer.ChangeMode.REMOVE); // variable.change(null, (Object[]) inputValue, Changer.ChangeMode.REMOVE);
break; break;
case "SET": case "SET":
Variables.setVariable(variableNames.get(i).toString(), inputValue, null, false); Variables.setVariable(variableNames.get(i).toString(), inputValue, null, false);
@ -186,6 +167,7 @@ public class RedisManager extends BinaryJedisPubSub implements Runnable {
} }
} }
public void sendMessage(String[] message, String channel) { public void sendMessage(String[] message, String channel) {
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
json.put("Messages", new JSONArray(message)); json.put("Messages", new JSONArray(message));
@ -193,6 +175,7 @@ public class RedisManager extends BinaryJedisPubSub implements Runnable {
json.put("Date", System.currentTimeMillis()); //for unique string every time & PING calculations json.put("Date", System.currentTimeMillis()); //for unique string every time & PING calculations
finishSendMessage(json, channel); finishSendMessage(json, channel);
} }
public void sendVariables(String[] variableNames, String[] variableValues, String channel, String operation) { public void sendVariables(String[] variableNames, String[] variableValues, String channel, String operation) {
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
json.put("Names", new JSONArray(variableNames)); json.put("Names", new JSONArray(variableNames));
@ -209,8 +192,8 @@ public class RedisManager extends BinaryJedisPubSub implements Runnable {
public void finishSendMessage(JSONObject json, String channel) { public void finishSendMessage(JSONObject json, String channel) {
try { try {
byte[] message; byte[] message;
if (this.getEncryption().isEncryptionEnabled()) { if (encryption.isEncryptionEnabled()) {
message = this.getEncryption().encrypt(json.toString()); message = encryption.encrypt(json.toString());
} else { } else {
message = json.toString().getBytes(StandardCharsets.UTF_8); message = json.toString().getBytes(StandardCharsets.UTF_8);
} }
@ -218,51 +201,39 @@ public class RedisManager extends BinaryJedisPubSub implements Runnable {
//sending a redis message blocks main thread if there's no more connections available //sending a redis message blocks main thread if there's no more connections available
//so to avoid issues, it's best to do it always on separate thread //so to avoid issues, it's best to do it always on separate thread
if (plugin.isEnabled()) { if (plugin.isEnabled()) {
this.getRedisService().execute(() -> { plugin.getServer().getScheduler().runTaskAsynchronously(plugin, () -> {
BinaryJedis j = this.getJedisPool().getResource(); try (BinaryJedis j = jedisPool.getResource()) {
j.publish(channel.getBytes(StandardCharsets.UTF_8), message); j.publish(channel.getBytes(StandardCharsets.UTF_8), message);
j.close(); } catch (Exception e) {
System.out.println("Error sending redis message!");
e.printStackTrace();
}
}); });
} else { } else {
//execute sending of redis message on the main thread if plugin is disabling //execute sending of redis message on the main thread if plugin is disabling
//so it can still process the sending //so it can still process the sending
BinaryJedis j = this.getJedisPool().getResource(); try (BinaryJedis j = jedisPool.getResource()) {
j.publish(channel.getBytes(StandardCharsets.UTF_8), message); j.publish(channel.getBytes(StandardCharsets.UTF_8), message);
j.close(); } catch (Exception e) {
e.printStackTrace();
}
} }
} catch (JedisConnectionException exception) { } catch (JedisConnectionException exception) {
exception.printStackTrace(); exception.printStackTrace();
} }
} }
public void shutdown() { private void setupChannels(Configuration config) {
List<String> channels = config.getStringList("Channels");
this.isShuttingDown.set(true); channelsInByte = new byte[channels.size()][1];
for (int x = 0; x < channels.size(); x++) {
if (this.subscribeJedis != null) { channelsInByte[x] = channels.get(x).getBytes(StandardCharsets.UTF_8);
this.unsubscribe();
this.subscribeJedis.close();
} }
if (this.jedisPool != null) {
jedisPool.close();
}
this.RedisReconnector.shutdown();
this.RedisService.shutdown();
}
public void reload() {
this.shutdown();
plugin.startRedis(true);
} }
public JedisPool getJedisPool() { public Boolean isRedisConnectionOffline() {
return jedisPool; return isConnectionBroken.get();
} }
public Encryption getEncryption() {
return encryption;
}
public ExecutorService getRedisService() { return RedisService; }
} }

View File

@ -35,7 +35,7 @@ public class EffSendMessage extends Effect {
Bukkit.getLogger().warning(ChatColor.translateAlternateColorCodes('&', "&2[&aRediSkript&a] &cChannel was empty. Please check your code.")); Bukkit.getLogger().warning(ChatColor.translateAlternateColorCodes('&', "&2[&aRediSkript&a] &cChannel was empty. Please check your code."));
return; return;
} }
plugin.getRedisManager().sendMessage(message, channel); plugin.getRC().sendMessage(message, channel);
} }
@Override @Override

View File

@ -66,10 +66,10 @@ public class ExprVariableInChannel extends SimpleExpression<Object> {
} }
} }
String operation = mode.toString(); String operation = mode.toString();
plugin.getRedisManager().sendVariables(name.getAll(e), values, channel.getSingle(e), operation); plugin.getRC().sendVariables(name.getAll(e), values, channel.getSingle(e), operation);
break; break;
case DELETE: case DELETE:
plugin.getRedisManager().sendVariables(name.getAll(e), null, channel.getSingle(e), "SET"); plugin.getRC().sendVariables(name.getAll(e), null, channel.getSingle(e), "SET");
break; break;
} }
} }

View File

@ -1,7 +1,7 @@
main: net.limework.rediskript.RediSkript main: net.limework.rediskript.RediSkript
name: ${project.name} name: ${project.name}
version: ${project.version} version: ${project.version}
authors: [Govindas, ham1255, DaemonicKing] authors: [Govindas, ham1255, DaemonicKing, limework.net]
api-version: 1.13 api-version: 1.13
depend: [Skript] depend: [Skript]
commands: commands: