Migrate from the consumer model to ExecutorService. The consumer was not flexible and was less reliable.

This commit is contained in:
Tux 2014-09-21 13:55:14 -04:00
parent 0dda93bfc8
commit e0ee2ada21
9 changed files with 148 additions and 182 deletions

View File

@ -134,7 +134,7 @@
<dependency>
<groupId>net.md-5</groupId>
<artifactId>bungeecord-api</artifactId>
<version>1.7-SNAPSHOT</version>
<version>1.8-SNAPSHOT</version>
<type>jar</type>
<scope>provided</scope>
</dependency>

View File

@ -10,7 +10,6 @@ import com.google.common.base.Functions;
import com.google.common.collect.*;
import com.google.common.io.ByteStreams;
import com.google.gson.Gson;
import com.imaginarycode.minecraft.redisbungee.consumerevents.PlayerLoggedInConsumerEvent;
import com.imaginarycode.minecraft.redisbungee.events.PubSubMessageEvent;
import com.imaginarycode.minecraft.redisbungee.util.UUIDTranslator;
@ -26,13 +25,8 @@ import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisException;
import java.io.*;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
@ -48,8 +42,6 @@ public final class RedisBungee extends Plugin {
@Getter
private JedisPool pool;
@Getter
private RedisBungeeConsumer consumer;
@Getter
private UUIDTranslator uuidTranslator;
@Getter
private static Gson gson = new Gson();
@ -57,6 +49,8 @@ public final class RedisBungee extends Plugin {
private String serverId;
@Getter
private DataManager dataManager;
@Getter
private ExecutorService service;
private static RedisBungeeAPI api;
private static PubSubListener psl = null;
@ -258,9 +252,7 @@ public final class RedisBungee extends Plugin {
serverIds = getCurrentServerIds();
}
}, 0, 3, TimeUnit.SECONDS);
consumer = new RedisBungeeConsumer(this);
dataManager = new DataManager(this);
getProxy().getScheduler().runAsync(this, consumer);
if (configuration.getBoolean("register-bungee-commands", true)) {
getProxy().getPluginManager().registerCommand(this, new RedisBungeeCommands.GlistCommand(this));
getProxy().getPluginManager().registerCommand(this, new RedisBungeeCommands.FindCommand(this));
@ -312,7 +304,7 @@ public final class RedisBungee extends Plugin {
// Player not online according to Redis but not BungeeCord. Fire another consumer event.
getLogger().warning("Player " + player + " is on the proxy but not in Redis.");
consumer.queue(new PlayerLoggedInConsumerEvent(getProxy().getPlayer(UUID.fromString(player))));
tmpRsc.sadd("proxy:" + serverId + ":usersOnline", player);
}
} finally {
pool.returnResource(tmpRsc);
@ -328,8 +320,16 @@ public final class RedisBungee extends Plugin {
if (pool != null) {
// Poison the PubSub listener
getProxy().getScheduler().cancel(this);
getLogger().info("Waiting for consumer to finish writing data...");
consumer.stop();
getLogger().info("Waiting for all tasks to finish.");
service.shutdown();
try {
if (!service.awaitTermination(60, TimeUnit.SECONDS)) {
service.shutdownNow();
}
} catch (InterruptedException ignored) {
}
Jedis tmpRsc = pool.getResource();
try {
tmpRsc.hdel("heartbeats", serverId);
@ -400,7 +400,7 @@ public final class RedisBungee extends Plugin {
Jedis rsc = null;
try {
rsc = pool.getResource();
rsc.exists(String.valueOf(System.currentTimeMillis()));
rsc.ping();
// If that worked, now we can check for an existing, alive Bungee:
File crashFile = new File(getDataFolder(), "restarted_from_crash.txt");
if (crashFile.exists())
@ -417,6 +417,23 @@ public final class RedisBungee extends Plugin {
} catch (NumberFormatException ignored) {
}
}
FutureTask<Void> task2 = new FutureTask<>(new Callable<Void>() {
@Override
public Void call() throws Exception {
service = Executors.newFixedThreadPool(16);
return null;
}
});
getProxy().getScheduler().runAsync(this, task2);
try {
task2.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Unable to create executor", e);
}
getLogger().log(Level.INFO, "Successfully connected to Redis.");
} catch (JedisConnectionException e) {
if (rsc != null)

View File

@ -1,84 +0,0 @@
/**
* Copyright © 2013 tuxed <write@imaginarycode.com>
* This work is free. You can redistribute it and/or modify it under the
* terms of the Do What The Fuck You Want To Public License, Version 2,
* as published by Sam Hocevar. See http://www.wtfpl.net/ for more details.
*/
package com.imaginarycode.minecraft.redisbungee;
import com.imaginarycode.minecraft.redisbungee.consumerevents.ConsumerEvent;
import com.imaginarycode.minecraft.redisbungee.consumerevents.PlayerChangedServerConsumerEvent;
import com.imaginarycode.minecraft.redisbungee.consumerevents.PlayerLoggedInConsumerEvent;
import com.imaginarycode.minecraft.redisbungee.consumerevents.PlayerLoggedOffConsumerEvent;
import lombok.RequiredArgsConstructor;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@RequiredArgsConstructor
public class RedisBungeeConsumer implements Runnable {
private final RedisBungee plugin;
private final BlockingQueue<ConsumerEvent> consumerQueue = new LinkedBlockingQueue<>();
private boolean stopped = false;
@Override
public void run() {
Jedis jedis = plugin.getPool().getResource();
try {
while (!stopped) {
ConsumerEvent event;
try {
event = consumerQueue.take();
} catch (InterruptedException e) {
continue;
}
handle(event, jedis);
}
for (ConsumerEvent event : consumerQueue)
handle(event, jedis);
consumerQueue.clear();
} finally {
plugin.getPool().returnResource(jedis);
}
}
private void handle(ConsumerEvent event, Jedis jedis) {
if (event instanceof PlayerLoggedInConsumerEvent) {
PlayerLoggedInConsumerEvent event1 = (PlayerLoggedInConsumerEvent) event;
jedis.sadd("proxy:" + RedisBungee.getApi().getServerId() + ":usersOnline", event1.getPlayer().getUniqueId().toString());
jedis.hset("player:" + event1.getPlayer().getUniqueId().toString(), "online", "0");
jedis.hset("player:" + event1.getPlayer().getUniqueId().toString(), "ip", event1.getPlayer().getAddress().getAddress().getHostAddress());
plugin.getUuidTranslator().persistInfo(event1.getPlayer().getName(), event1.getPlayer().getUniqueId(), jedis);
jedis.hset("player:" + event1.getPlayer().getUniqueId().toString(), "proxy", plugin.getServerId());
jedis.publish("redisbungee-data", RedisBungee.getGson().toJson(new DataManager.DataManagerMessage<>(
event1.getPlayer().getUniqueId(), DataManager.DataManagerMessage.Action.JOIN,
new DataManager.LoginPayload(event1.getPlayer().getAddress().getAddress()))));
} else if (event instanceof PlayerLoggedOffConsumerEvent) {
PlayerLoggedOffConsumerEvent event1 = (PlayerLoggedOffConsumerEvent) event;
long timestamp = System.currentTimeMillis();
jedis.hset("player:" + event1.getPlayer().getUniqueId().toString(), "online", String.valueOf(timestamp));
RedisUtil.cleanUpPlayer(event1.getPlayer().getUniqueId().toString(), jedis);
jedis.publish("redisbungee-data", RedisBungee.getGson().toJson(new DataManager.DataManagerMessage<>(
event1.getPlayer().getUniqueId(), DataManager.DataManagerMessage.Action.LEAVE,
new DataManager.LogoutPayload(timestamp))));
} else if (event instanceof PlayerChangedServerConsumerEvent) {
PlayerChangedServerConsumerEvent event1 = (PlayerChangedServerConsumerEvent) event;
jedis.hset("player:" + event1.getPlayer().getUniqueId().toString(), "server", event1.getNewServer().getName());
jedis.publish("redisbungee-data", RedisBungee.getGson().toJson(new DataManager.DataManagerMessage<>(
event1.getPlayer().getUniqueId(), DataManager.DataManagerMessage.Action.SERVER_CHANGE,
new DataManager.ServerChangePayload(event1.getNewServer().getName()))));
}
}
public void queue(ConsumerEvent event) {
if (!stopped)
consumerQueue.add(event);
}
public void stop() {
stopped = true;
while (!consumerQueue.isEmpty()) ;
}
}

View File

@ -10,10 +10,8 @@ import com.google.common.base.Joiner;
import com.google.common.io.ByteArrayDataInput;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import com.imaginarycode.minecraft.redisbungee.consumerevents.PlayerChangedServerConsumerEvent;
import com.imaginarycode.minecraft.redisbungee.consumerevents.PlayerLoggedInConsumerEvent;
import com.imaginarycode.minecraft.redisbungee.consumerevents.PlayerLoggedOffConsumerEvent;
import com.imaginarycode.minecraft.redisbungee.events.PubSubMessageEvent;
import com.imaginarycode.minecraft.redisbungee.util.RedisCallable;
import lombok.AllArgsConstructor;
import net.md_5.bungee.api.ChatColor;
import net.md_5.bungee.api.ServerPing;
@ -27,6 +25,7 @@ import net.md_5.bungee.event.EventPriority;
import redis.clients.jedis.Jedis;
import java.util.*;
import java.util.concurrent.Callable;
@AllArgsConstructor
public class RedisBungeeListener implements Listener {
@ -48,7 +47,20 @@ public class RedisBungeeListener implements Listener {
}
}
plugin.getConsumer().queue(new PlayerLoggedInConsumerEvent(event.getPlayer()));
plugin.getService().submit((Callable<Void>) new RedisCallable<Void>(plugin) {
@Override
protected Void call(Jedis jedis) {
jedis.sadd("proxy:" + RedisBungee.getApi().getServerId() + ":usersOnline", event.getPlayer().getUniqueId().toString());
jedis.hset("player:" + event.getPlayer().getUniqueId().toString(), "online", "0");
jedis.hset("player:" + event.getPlayer().getUniqueId().toString(), "ip", event.getPlayer().getAddress().getAddress().getHostAddress());
plugin.getUuidTranslator().persistInfo(event.getPlayer().getName(), event.getPlayer().getUniqueId(), jedis);
jedis.hset("player:" + event.getPlayer().getUniqueId().toString(), "proxy", plugin.getServerId());
jedis.publish("redisbungee-data", RedisBungee.getGson().toJson(new DataManager.DataManagerMessage<>(
event.getPlayer().getUniqueId(), DataManager.DataManagerMessage.Action.JOIN,
new DataManager.LoginPayload(event.getPlayer().getAddress().getAddress()))));
return null;
}
});
} finally {
plugin.getPool().returnResource(rsc);
}
@ -56,23 +68,50 @@ public class RedisBungeeListener implements Listener {
@EventHandler
public void onPlayerDisconnect(final PlayerDisconnectEvent event) {
plugin.getConsumer().queue(new PlayerLoggedOffConsumerEvent(event.getPlayer()));
plugin.getService().submit((Callable<Void>) new RedisCallable<Void>(plugin) {
@Override
protected Void call(Jedis jedis) {
long timestamp = System.currentTimeMillis();
jedis.hset("player:" + event.getPlayer().getUniqueId().toString(), "online", String.valueOf(timestamp));
RedisUtil.cleanUpPlayer(event.getPlayer().getUniqueId().toString(), jedis);
jedis.publish("redisbungee-data", RedisBungee.getGson().toJson(new DataManager.DataManagerMessage<>(
event.getPlayer().getUniqueId(), DataManager.DataManagerMessage.Action.LEAVE,
new DataManager.LogoutPayload(timestamp))));
return null;
}
});
}
@EventHandler
public void onServerChange(final ServerConnectedEvent event) {
plugin.getConsumer().queue(new PlayerChangedServerConsumerEvent(event.getPlayer(), event.getServer().getInfo()));
plugin.getService().submit((Callable<Void>) new RedisCallable<Void>(plugin) {
@Override
protected Void call(Jedis jedis) {
jedis.hset("player:" + event.getPlayer().getUniqueId().toString(), "server", event.getServer().getInfo().getName());
jedis.publish("redisbungee-data", RedisBungee.getGson().toJson(new DataManager.DataManagerMessage<>(
event.getPlayer().getUniqueId(), DataManager.DataManagerMessage.Action.SERVER_CHANGE,
new DataManager.ServerChangePayload(event.getServer().getInfo().getName()))));
return null;
}
});
}
@EventHandler(priority = EventPriority.LOWEST)
public void onPing(ProxyPingEvent event) {
ServerPing old = event.getResponse();
ServerPing reply = new ServerPing();
reply.setPlayers(new ServerPing.Players(old.getPlayers().getMax(), plugin.getCount(), old.getPlayers().getSample()));
reply.setDescription(old.getDescription());
reply.setFavicon(old.getFaviconObject());
reply.setVersion(old.getVersion());
event.setResponse(reply);
public void onPing(final ProxyPingEvent event) {
event.registerIntent(plugin);
plugin.getProxy().getScheduler().runAsync(plugin, new Runnable() {
@Override
public void run() {
ServerPing old = event.getResponse();
ServerPing reply = new ServerPing();
reply.setPlayers(new ServerPing.Players(old.getPlayers().getMax(), plugin.getCount(), old.getPlayers().getSample()));
reply.setDescription(old.getDescription());
reply.setFavicon(old.getFaviconObject());
reply.setVersion(old.getVersion());
event.setResponse(reply);
event.completeIntent(plugin);
}
});
}
@EventHandler

View File

@ -1,10 +0,0 @@
/**
* Copyright © 2013 tuxed <write@imaginarycode.com>
* This work is free. You can redistribute it and/or modify it under the
* terms of the Do What The Fuck You Want To Public License, Version 2,
* as published by Sam Hocevar. See http://www.wtfpl.net/ for more details.
*/
package com.imaginarycode.minecraft.redisbungee.consumerevents;
public interface ConsumerEvent {
}

View File

@ -1,24 +0,0 @@
/**
* Copyright © 2013 tuxed <write@imaginarycode.com>
* This work is free. You can redistribute it and/or modify it under the
* terms of the Do What The Fuck You Want To Public License, Version 2,
* as published by Sam Hocevar. See http://www.wtfpl.net/ for more details.
*/
package com.imaginarycode.minecraft.redisbungee.consumerevents;
import lombok.Getter;
import net.md_5.bungee.api.config.ServerInfo;
import net.md_5.bungee.api.connection.ProxiedPlayer;
@Getter
public class PlayerChangedServerConsumerEvent implements ConsumerEvent {
private final ProxiedPlayer player;
private final ServerInfo oldServer;
private final ServerInfo newServer;
public PlayerChangedServerConsumerEvent(ProxiedPlayer player, ServerInfo newServer) {
this.player = player;
this.newServer = newServer;
this.oldServer = player.getServer() != null ? player.getServer().getInfo() : null;
}
}

View File

@ -1,17 +0,0 @@
/**
* Copyright © 2013 tuxed <write@imaginarycode.com>
* This work is free. You can redistribute it and/or modify it under the
* terms of the Do What The Fuck You Want To Public License, Version 2,
* as published by Sam Hocevar. See http://www.wtfpl.net/ for more details.
*/
package com.imaginarycode.minecraft.redisbungee.consumerevents;
import lombok.AllArgsConstructor;
import lombok.Getter;
import net.md_5.bungee.api.connection.ProxiedPlayer;
@AllArgsConstructor
@Getter
public class PlayerLoggedInConsumerEvent implements ConsumerEvent {
private final ProxiedPlayer player;
}

View File

@ -1,17 +0,0 @@
/**
* Copyright © 2013 tuxed <write@imaginarycode.com>
* This work is free. You can redistribute it and/or modify it under the
* terms of the Do What The Fuck You Want To Public License, Version 2,
* as published by Sam Hocevar. See http://www.wtfpl.net/ for more details.
*/
package com.imaginarycode.minecraft.redisbungee.consumerevents;
import lombok.AllArgsConstructor;
import lombok.Getter;
import net.md_5.bungee.api.connection.ProxiedPlayer;
@AllArgsConstructor
@Getter
public class PlayerLoggedOffConsumerEvent implements ConsumerEvent {
private final ProxiedPlayer player;
}

View File

@ -0,0 +1,62 @@
/**
* Copyright © 2013 tuxed <write@imaginarycode.com>
* This work is free. You can redistribute it and/or modify it under the
* terms of the Do What The Fuck You Want To Public License, Version 2,
* as published by Sam Hocevar. See http://www.wtfpl.net/ for more details.
*/
package com.imaginarycode.minecraft.redisbungee.util;
import com.imaginarycode.minecraft.redisbungee.RedisBungee;
import lombok.AllArgsConstructor;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisConnectionException;
import java.util.concurrent.Callable;
import java.util.logging.Level;
@AllArgsConstructor
public abstract class RedisCallable<T> implements Callable<T>, Runnable {
private final RedisBungee plugin;
@Override
public void run() {
run(false);
}
@Override
public T call() {
return run(false);
}
private T run(boolean retry) {
Jedis jedis = null;
try {
jedis = plugin.getPool().getResource();
return call(jedis);
} catch (JedisConnectionException e) {
plugin.getLogger().log(Level.SEVERE, "Unable to get connection", e);
if (jedis != null)
plugin.getPool().returnBrokenResource(jedis);
if (!retry) {
// Wait one second before retrying the task
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
throw new RuntimeException("task failed to run", e1);
}
run(true);
}
} finally {
if (jedis != null) {
plugin.getPool().returnResource(jedis);
}
}
throw new RuntimeException("task failed to run");
}
protected abstract T call(Jedis jedis);
}