From c25f4c8d6033355f22c433981d9dfb366b6100af Mon Sep 17 00:00:00 2001 From: BallOfEnergy <66693744+BallOfEnergy1@users.noreply.github.com> Date: Fri, 3 Jan 2025 20:29:17 -0600 Subject: [PATCH] Fix /ntmpackets command registration. Fix small race condition with a ReentrantLock. --- .../com/hbm/commands/CommandPacketInfo.java | 87 +++++++++++------- .../handler/threading/PacketThreading.java | 49 +++++++---- src/main/java/com/hbm/main/MainRegistry.java | 1 + .../java/com/hbm/main/NetworkHandler.java | 88 +++++++++++++++---- .../hbm/tileentity/TileEntityLoadedBase.java | 2 +- 5 files changed, 163 insertions(+), 64 deletions(-) diff --git a/src/main/java/com/hbm/commands/CommandPacketInfo.java b/src/main/java/com/hbm/commands/CommandPacketInfo.java index dec1fbff7..4432e595f 100644 --- a/src/main/java/com/hbm/commands/CommandPacketInfo.java +++ b/src/main/java/com/hbm/commands/CommandPacketInfo.java @@ -2,6 +2,7 @@ package com.hbm.commands; import com.hbm.config.GeneralConfig; import com.hbm.handler.threading.PacketThreading; +import com.hbm.main.MainRegistry; import com.hbm.util.BobMathUtil; import net.minecraft.command.CommandBase; import net.minecraft.command.ICommandSender; @@ -10,6 +11,7 @@ import net.minecraft.util.EnumChatFormatting; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; +import java.util.concurrent.TimeUnit; import static com.hbm.handler.threading.PacketThreading.totalCnt; @@ -21,48 +23,67 @@ public class CommandPacketInfo extends CommandBase { @Override public String getCommandUsage(ICommandSender sender) { - return "/ntmpackets"; + return EnumChatFormatting.RED + "/ntmpackets [info/resetState/toggleThreadingStatus/forceLock/forceUnlock]"; } @Override public void processCommand(ICommandSender sender, String[] args) { - if(args.length > 0 && args[0].equals("resetState")) { - PacketThreading.hasTriggered = false; - PacketThreading.clearCnt = 0; - } + if (args.length > 0) { + switch (args[0]) { + case "resetState": + PacketThreading.hasTriggered = false; + PacketThreading.clearCnt = 0; + return; + case "toggleThreadingStatus": + GeneralConfig.enablePacketThreading = !GeneralConfig.enablePacketThreading; // Force toggle. + PacketThreading.init(); // Reinit threads. + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GREEN + "Packet sending status toggled to " + GeneralConfig.enablePacketThreading + ".")); + return; + case "forceLock": + PacketThreading.lock.lock(); // oh my fucking god never do this please unless you really have to + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.RED + "Packet thread lock acquired, this may freeze the main thread!")); + MainRegistry.logger.error("Packet thread lock acquired by {}, this may freeze the main thread!", sender.getCommandSenderName()); + return; + case "forceUnlock": + PacketThreading.lock.unlock(); + MainRegistry.logger.warn("Packet thread lock released by {}.", sender.getCommandSenderName()); + return; + case "info": + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GOLD + "NTM Packet Debugger v1.2")); - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GOLD + "NTM Packet Debugger v1.2")); + if (PacketThreading.isTriggered() && GeneralConfig.enablePacketThreading) + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.RED + "Packet Threading Errored, check log.")); + else if (GeneralConfig.enablePacketThreading) + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GREEN + "Packet Threading Active")); + else + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.RED + "Packet Threading Inactive")); - if(PacketThreading.isTriggered() && GeneralConfig.enablePacketThreading) - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.RED + "Packet Threading Errored, check log.")); - else if(GeneralConfig.enablePacketThreading) - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GREEN + "Packet Threading Active")); - else - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.RED + "Packet Threading Inactive")); + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Thread Pool Info")); + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "# Threads (total): " + PacketThreading.threadPool.getPoolSize())); + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "# Threads (core): " + PacketThreading.threadPool.getCorePoolSize())); + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "# Threads (idle): " + (PacketThreading.threadPool.getPoolSize() - PacketThreading.threadPool.getActiveCount()))); + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "# Threads (maximum): " + PacketThreading.threadPool.getMaximumPoolSize())); - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Thread Pool Info")); - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "# Threads (total): " + PacketThreading.threadPool.getPoolSize())); - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "# Threads (core): " + PacketThreading.threadPool.getCorePoolSize())); - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "# Threads (idle): " + (PacketThreading.threadPool.getPoolSize() - PacketThreading.threadPool.getActiveCount()))); - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "# Threads (maximum): " + PacketThreading.threadPool.getMaximumPoolSize())); + for (ThreadInfo thread : ManagementFactory.getThreadMXBean().dumpAllThreads(false, false)) + if (thread.getThreadName().startsWith(PacketThreading.threadPrefix)) { + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GOLD + "Thread Name: " + thread.getThreadName())); + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Thread ID: " + thread.getThreadId())); + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Thread state: " + thread.getThreadState())); + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Locked by: " + (thread.getLockOwnerName() == null ? "None" : thread.getLockName()))); + } - for(ThreadInfo thread : ManagementFactory.getThreadMXBean().dumpAllThreads(false, false)) - if(thread.getThreadName().startsWith("NTM-Packet-Thread-")) { - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GOLD + "Thread Name: " + thread.getThreadName())); - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Thread ID: " + thread.getThreadId())); - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Thread state: " + thread.getThreadState())); - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Locked by: " + (thread.getLockOwnerName() == null ? "None" : thread.getLockName()))); + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GOLD + "Packet Info: ")); + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Amount total: " + totalCnt)); + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Amount remaining: " + PacketThreading.threadPool.getQueue().size())); + + if (totalCnt != 0) + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "% Remaining to process: " + BobMathUtil.roundDecimal(((double) PacketThreading.threadPool.getQueue().size() / totalCnt) * 100, 2) + "%")); + + sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Time spent waiting on thread(s) last tick: " + BobMathUtil.roundDecimal(TimeUnit.NANOSECONDS.convert(PacketThreading.nanoTimeWaited, TimeUnit.MILLISECONDS), 4) + "ms")); + return; } - - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GOLD + "Packet Info: ")); - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Amount total: " + totalCnt)); - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Amount remaining: " + PacketThreading.threadPool.getQueue().size())); - - if(totalCnt != 0) - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "% Remaining to process: " + BobMathUtil.roundDecimal(((double) PacketThreading.threadPool.getQueue().size() / totalCnt) * 100, 2) + "%")); - - sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Time spent waiting on thread(s) last tick: " + BobMathUtil.roundDecimal(PacketThreading.nanoTimeWaited / 1000000d, 4) + "ms")); - + } + sender.addChatMessage(new ChatComponentText(getCommandUsage(sender))); } } diff --git a/src/main/java/com/hbm/handler/threading/PacketThreading.java b/src/main/java/com/hbm/handler/threading/PacketThreading.java index 7392e59d8..438481d52 100644 --- a/src/main/java/com/hbm/handler/threading/PacketThreading.java +++ b/src/main/java/com/hbm/handler/threading/PacketThreading.java @@ -11,10 +11,13 @@ import cpw.mods.fml.common.network.simpleimpl.IMessage; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; +import java.util.concurrent.locks.ReentrantLock; public class PacketThreading { - private static final ThreadFactory packetThreadFactory = new ThreadFactoryBuilder().setNameFormat("NTM-Packet-Thread-%d").build(); + public static final String threadPrefix = "NTM-Packet-Thread-"; + + public static final ThreadFactory packetThreadFactory = new ThreadFactoryBuilder().setNameFormat(threadPrefix + "%d").build(); public static final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1, packetThreadFactory); @@ -24,17 +27,19 @@ public class PacketThreading { public static final List> futureList = new ArrayList<>(); + public static ReentrantLock lock = new ReentrantLock(); + /** * Sets up thread pool settings during mod initialization. */ public static void init() { threadPool.setKeepAliveTime(50, TimeUnit.MILLISECONDS); - if(GeneralConfig.enablePacketThreading) { - if(GeneralConfig.packetThreadingCoreCount < 0 || GeneralConfig.packetThreadingMaxCount <= 0) { + if (GeneralConfig.enablePacketThreading) { + if (GeneralConfig.packetThreadingCoreCount < 0 || GeneralConfig.packetThreadingMaxCount <= 0) { MainRegistry.logger.error("0.02_packetThreadingCoreCount < 0 or 0.03_packetThreadingMaxCount is <= 0, defaulting to 1 each."); threadPool.setCorePoolSize(1); // beugh threadPool.setMaximumPoolSize(1); - } else if(GeneralConfig.packetThreadingMaxCount > GeneralConfig.packetThreadingCoreCount) { + } else if (GeneralConfig.packetThreadingMaxCount > GeneralConfig.packetThreadingCoreCount) { MainRegistry.logger.error("0.03_packetThreadingMaxCount is > 0.02_packetThreadingCoreCount, defaulting to 1 each."); threadPool.setCorePoolSize(1); threadPool.setMaximumPoolSize(1); @@ -45,8 +50,14 @@ public class PacketThreading { threadPool.allowCoreThreadTimeOut(false); } else { threadPool.allowCoreThreadTimeOut(true); - for(Runnable task : threadPool.getQueue()) { - task.run(); // Run all tasks async just in-case there *are* tasks left to run. + try { + lock.lock(); + for (Runnable task : threadPool.getQueue()) { + task.run(); // Run all tasks async just in-case there *are* tasks left to run. + } + clearThreadPoolTasks(); + } finally { + lock.unlock(); } } } @@ -56,8 +67,7 @@ public class PacketThreading { * @param message Message to process. * @param target TargetPoint to send to. */ - public static void createThreadedPacket(IMessage message, TargetPoint target) { - + public static void createAllAroundThreadedPacket(IMessage message, TargetPoint target) { // `message` can be precompiled or not. if(message instanceof PrecompiledPacket) ((PrecompiledPacket) message).getPreBuf(); // Gets the precompiled buffer, doing nothing if it already exists. @@ -65,17 +75,26 @@ public class PacketThreading { totalCnt++; Runnable task = () -> { - PacketDispatcher.wrapper.sendToAllAround(message, target); - if(message instanceof PrecompiledPacket) - ((PrecompiledPacket) message).getPreBuf().release(); + try { + lock.lock(); + PacketDispatcher.wrapper.sendToAllAround(message, target); + if (message instanceof PrecompiledPacket) + ((PrecompiledPacket) message).getPreBuf().release(); + } finally { + lock.unlock(); + } }; + addTask(task); + } + + private static void addTask(Runnable task) { if(isTriggered()) task.run(); else if(GeneralConfig.enablePacketThreading) - futureList.add(threadPool.submit(task)); // Thread it + futureList.add(threadPool.submit(task)); else - task.run(); // no threading :( + task.run(); } /** @@ -86,7 +105,9 @@ public class PacketThreading { try { if (GeneralConfig.enablePacketThreading && (!GeneralConfig.packetThreadingErrorBypass && !hasTriggered)) { for (Future future : futureList) { + nanoTimeWaited = System.nanoTime() - startTime; future.get(50, TimeUnit.MILLISECONDS); // I HATE EVERYTHING + if(TimeUnit.NANOSECONDS.convert(nanoTimeWaited, TimeUnit.MILLISECONDS) > 50) throw new TimeoutException(); // >50ms total time? timeout? yes sir, ooh rah! } } } catch (ExecutionException ignored) { @@ -99,7 +120,6 @@ public class PacketThreading { Thread.currentThread().interrupt(); // maybe not the best thing but it's gotta be here } finally { futureList.clear(); - nanoTimeWaited = System.nanoTime() - startTime; if(!threadPool.getQueue().isEmpty()) { if(!GeneralConfig.packetThreadingErrorBypass && !hasTriggered) MainRegistry.logger.warn("Residual packets in packet threading queue detected, discarding {}/{} packets.", threadPool.getQueue().size(), totalCnt); @@ -128,7 +148,6 @@ public class PacketThreading { clearCnt++; - if(clearCnt > 5 && !isTriggered()) { // If it's been cleared 5 times in a row, something may have gone really wrong. // Best case scenario here, the server is lagging terribly, has a bad CPU, or has a poor network connection diff --git a/src/main/java/com/hbm/main/MainRegistry.java b/src/main/java/com/hbm/main/MainRegistry.java index a368296a6..8f818c9a0 100644 --- a/src/main/java/com/hbm/main/MainRegistry.java +++ b/src/main/java/com/hbm/main/MainRegistry.java @@ -936,6 +936,7 @@ public class MainRegistry { event.registerServerCommand(new CommandDebugChunkLoad()); event.registerServerCommand(new CommandSatellites()); event.registerServerCommand(new CommandRadiation()); + event.registerServerCommand(new CommandPacketInfo()); } @EventHandler diff --git a/src/main/java/com/hbm/main/NetworkHandler.java b/src/main/java/com/hbm/main/NetworkHandler.java index bd9caa725..e7dad9ced 100644 --- a/src/main/java/com/hbm/main/NetworkHandler.java +++ b/src/main/java/com/hbm/main/NetworkHandler.java @@ -1,5 +1,6 @@ package com.hbm.main; +import com.hbm.handler.threading.PacketThreading; import com.hbm.packet.PrecompiledPacket; import cpw.mods.fml.common.network.FMLEmbeddedChannel; import cpw.mods.fml.common.network.FMLOutboundHandler; @@ -20,6 +21,9 @@ import io.netty.handler.codec.CodecException; import io.netty.handler.codec.MessageToMessageCodec; import net.minecraft.entity.player.EntityPlayerMP; +import javax.management.MXBean; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; import java.lang.ref.WeakReference; import java.util.EnumMap; import java.util.List; @@ -124,37 +128,91 @@ public class NetworkHandler { serverChannel.flush(); } - public void sendToServer(IMessage message) { + public void sendToServer(IMessage message) { // No thread protection needed here, since the client never threads packets to the server. clientChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.TOSERVER); clientChannel.write(message); } public void sendToDimension(IMessage message, int dimensionId) { - serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.DIMENSION); - serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(dimensionId); - serverChannel.write(message); + if(!Thread.currentThread().getName().contains(PacketThreading.threadPrefix)) { + try { + PacketThreading.lock.lock(); + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.DIMENSION); + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(dimensionId); + serverChannel.write(message); + } finally { + PacketThreading.lock.unlock(); + } + } else { + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.DIMENSION); + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(dimensionId); + serverChannel.write(message); + } } public void sendToAllAround(IMessage message, NetworkRegistry.TargetPoint point) { - serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALLAROUNDPOINT); - serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(point); - serverChannel.write(message); + if(!Thread.currentThread().getName().contains(PacketThreading.threadPrefix)) { + try { + PacketThreading.lock.lock(); + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALLAROUNDPOINT); + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(point); + serverChannel.write(message); + } finally { + PacketThreading.lock.unlock(); + } + } else { + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALLAROUNDPOINT); + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(point); + serverChannel.write(message); + } } public void sendToAllAround(ByteBuf message, NetworkRegistry.TargetPoint point) { - serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALLAROUNDPOINT); - serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(point); - serverChannel.write(message); + if(!Thread.currentThread().getName().contains(PacketThreading.threadPrefix)) { + try { + PacketThreading.lock.lock(); + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALLAROUNDPOINT); + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(point); + serverChannel.write(message); + } finally { + PacketThreading.lock.unlock(); + } + } else { + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALLAROUNDPOINT); + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(point); + serverChannel.write(message); + } } public void sendTo(IMessage message, EntityPlayerMP player) { - serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.PLAYER); - serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(player); - serverChannel.write(message); + if(!Thread.currentThread().getName().contains(PacketThreading.threadPrefix)) { + try { + PacketThreading.lock.lock(); + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.PLAYER); + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(player); + serverChannel.write(message); + } finally { + PacketThreading.lock.unlock(); + } + } else { + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.PLAYER); + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(player); + serverChannel.write(message); + } } public void sendToAll(IMessage message) { - serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALL); - serverChannel.write(message); + if(!Thread.currentThread().getName().contains(PacketThreading.threadPrefix)) { + try { + PacketThreading.lock.lock(); + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALL); + serverChannel.write(message); + } finally { + PacketThreading.lock.unlock(); + } + } else { + serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALL); + serverChannel.write(message); + } } } diff --git a/src/main/java/com/hbm/tileentity/TileEntityLoadedBase.java b/src/main/java/com/hbm/tileentity/TileEntityLoadedBase.java index 6658452b7..74b61d7b0 100644 --- a/src/main/java/com/hbm/tileentity/TileEntityLoadedBase.java +++ b/src/main/java/com/hbm/tileentity/TileEntityLoadedBase.java @@ -86,7 +86,7 @@ public class TileEntityLoadedBase extends TileEntity implements ILoadedTile, IBu this.lastPackedBuf = preBuf.copy(); - PacketThreading.createThreadedPacket(packet, new NetworkRegistry.TargetPoint(this.worldObj.provider.dimensionId, xCoord, yCoord, zCoord, range)); + PacketThreading.createAllAroundThreadedPacket(packet, new NetworkRegistry.TargetPoint(this.worldObj.provider.dimensionId, xCoord, yCoord, zCoord, range)); } }