Added Reconnection for channels sub

This commit is contained in:
ham1255 2020-05-10 23:10:45 +04:00
parent d82f7f4bd8
commit e634985b87
5 changed files with 46 additions and 56 deletions

View File

@ -7,13 +7,11 @@
</component> </component>
<component name="ChangeListManager"> <component name="ChangeListManager">
<list default="true" id="61139119-6327-48a6-9183-0df6346ed8d8" name="Default Changelist" comment=""> <list default="true" id="61139119-6327-48a6-9183-0df6346ed8d8" name="Default Changelist" comment="">
<change afterPath="$PROJECT_DIR$/.idea/vcs.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/artifacts/SkLimework_jar.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/artifacts/SkLimework_jar.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" /> <change beforePath="$PROJECT_DIR$/src/main/java/net/limework/skLimework/AddonPlugin.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/net/limework/skLimework/AddonPlugin.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/net/limework/skLimework/DoNotUse/Tester.java" beforeDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/net/limework/skLimework/Events/RedisSub.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/net/limework/skLimework/Events/RedisSub.java" afterDir="false" /> <change beforePath="$PROJECT_DIR$/src/main/java/net/limework/skLimework/Events/RedisSub.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/net/limework/skLimework/Events/RedisSub.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/java/net/limework/skLimework/elements/EffSendMessage.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/net/limework/skLimework/elements/EffSendMessage.java" afterDir="false" /> <change beforePath="$PROJECT_DIR$/src/main/java/net/limework/skLimework/elements/EffSendMessage.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/net/limework/skLimework/elements/EffSendMessage.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/resources/config.yml" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/resources/config.yml" afterDir="false" />
</list> </list>
<option name="EXCLUDED_CONVERTED_TO_IGNORED" value="true" /> <option name="EXCLUDED_CONVERTED_TO_IGNORED" value="true" />
<option name="SHOW_DIALOG" value="false" /> <option name="SHOW_DIALOG" value="false" />

View File

@ -67,6 +67,7 @@ public class AddonPlugin extends JavaPlugin {
this.getConfig().getBoolean("Redis.useSSL")); this.getConfig().getBoolean("Redis.useSSL"));
redisSub = new RedisSub(this, jedisPool.getResource(), this.getConfig().getStringList("Channels")); redisSub = new RedisSub(this, jedisPool.getResource(), this.getConfig().getStringList("Channels"));
service = Executors.newFixedThreadPool(this.getConfig().getInt("Redis.Threads")); service = Executors.newFixedThreadPool(this.getConfig().getInt("Redis.Threads"));
service.execute(redisSub);
Bukkit.getLogger().info("[Govindas limework Addon] was enabled!"); Bukkit.getLogger().info("[Govindas limework Addon] was enabled!");
} }
@ -74,7 +75,7 @@ public class AddonPlugin extends JavaPlugin {
@Override @Override
public void onDisable(){ public void onDisable(){
redisSub.unSubAndCloseConnection(); redisSub.shutdown();
service.shutdown(); service.shutdown();
try { service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { e.printStackTrace(); } try { service.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { e.printStackTrace(); }
jedisPool.close(); jedisPool.close();

View File

@ -1,29 +0,0 @@
package net.limework.skLimework.DoNotUse;
import org.json.JSONObject;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
public class Tester {
public static void main(String[] args) throws InterruptedException {
JedisPoolConfig jconfig = new JedisPoolConfig();
jconfig.setMaxTotal(100);
jconfig.setMaxIdle(100);
jconfig.setMinIdle(1);
JedisPool d = new JedisPool(jconfig, "192.168.0.112", 6379, 400, "yHy0d2zdBlRmaSPj3CiBwEv5V3XxBTLTrCsGW7ntBnzhfxPxXJS6Q1aTtR6DSfAtCZr2VxWnsungXHTcF94a4bsWEpGAvjL6XMU");
Jedis dd = d.getResource();
JSONObject J = new JSONObject();
J.put("Message", "something::something::something");
int x = 0;
while (true) {
x++;
System.out.println(x);
dd.publish("fs", J.toString() );
if (x == 1000) break;
Thread.sleep(1000);
}
}
}

View File

@ -10,26 +10,48 @@ import redis.clients.jedis.exceptions.JedisConnectionException;
import java.util.List; import java.util.List;
public class RedisSub extends JedisPubSub {
public class RedisSub extends JedisPubSub implements Runnable{
private AddonPlugin plugin; private AddonPlugin plugin;
private Jedis j; private Jedis j;
private String[] channels;
public RedisSub(AddonPlugin plugin, Jedis j, List<String> channels) { public RedisSub(AddonPlugin plugin, Jedis j, List<String> channels) {
this.plugin = plugin; this.plugin = plugin;
this.j = j; this.j = j;
String[] ss = channels.toArray(new String[0]); this.channels = channels.toArray(new String[0]);
plugin.getServer().getScheduler().runTaskAsynchronously(plugin, }
() -> {
@Override
public void run(){
try{ try{
this.j.subscribe(this, ss); this.j.subscribe(this, channels);
} catch (JedisConnectionException je){ } catch (JedisConnectionException je){
this.unSubAndCloseConnection(); plugin.getLogger().warning("Lost connection to redis!");
Bukkit.broadcastMessage("Redis Went down!"); newJedis();
}
} }
});
private void newJedis() {
this.unsubscribe();
this.j.close();
while (true){
try {
plugin.getLogger().info("reconnecting to Redis!");
this.j = plugin.getJedisPool().getResource();
plugin.getLogger().info("Connected!");
break;
}catch (JedisConnectionException e){
plugin.getLogger().warning("reconnecting to Redis has Failed! retrying in 4 seconds!");
try { Thread.sleep(4000);}catch (InterruptedException ignored){}
} }
}
plugin.getJedisExecutionService().execute(this);
}
@Override @Override
public void onMessage(String channel, String message) { public void onMessage(String channel, String message) {
@ -44,8 +66,9 @@ public class RedisSub extends JedisPubSub {
} }
public void unSubAndCloseConnection(){ public void shutdown(){
this.unsubscribe(); this.unsubscribe();
j.close(); j.close();
} }
} }

View File

@ -35,11 +35,7 @@ public class EffSendMessage extends Effect {
return; return;
} }
plugin.getJedisExecutionService().execute(() -> { plugin.getJedisExecutionService().execute(() -> {
Jedis j; Jedis j = plugin.getJedisPool().getResource();
try {j = plugin.getJedisPool().getResource();}catch (JedisConnectionException e){
Bukkit.broadcastMessage("Redis is down!!! dont send messages");
return;
}
JSONObject json = new JSONObject(); JSONObject json = new JSONObject();
try { try {
json.put("Message", message); json.put("Message", message);
@ -63,4 +59,5 @@ public class EffSendMessage extends Effect {
this.message = (Expression<String>) expressions[1]; this.message = (Expression<String>) expressions[1];
return true; return true;
} }
} }