Fix /ntmpackets command registration.

Fix small race condition with a ReentrantLock.
This commit is contained in:
BallOfEnergy 2025-01-03 20:29:17 -06:00
parent 9a22c343c9
commit c25f4c8d60
5 changed files with 163 additions and 64 deletions

View File

@ -2,6 +2,7 @@ package com.hbm.commands;
import com.hbm.config.GeneralConfig; import com.hbm.config.GeneralConfig;
import com.hbm.handler.threading.PacketThreading; import com.hbm.handler.threading.PacketThreading;
import com.hbm.main.MainRegistry;
import com.hbm.util.BobMathUtil; import com.hbm.util.BobMathUtil;
import net.minecraft.command.CommandBase; import net.minecraft.command.CommandBase;
import net.minecraft.command.ICommandSender; import net.minecraft.command.ICommandSender;
@ -10,6 +11,7 @@ import net.minecraft.util.EnumChatFormatting;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo; import java.lang.management.ThreadInfo;
import java.util.concurrent.TimeUnit;
import static com.hbm.handler.threading.PacketThreading.totalCnt; import static com.hbm.handler.threading.PacketThreading.totalCnt;
@ -21,17 +23,33 @@ public class CommandPacketInfo extends CommandBase {
@Override @Override
public String getCommandUsage(ICommandSender sender) { public String getCommandUsage(ICommandSender sender) {
return "/ntmpackets"; return EnumChatFormatting.RED + "/ntmpackets [info/resetState/toggleThreadingStatus/forceLock/forceUnlock]";
} }
@Override @Override
public void processCommand(ICommandSender sender, String[] args) { public void processCommand(ICommandSender sender, String[] args) {
if(args.length > 0 && args[0].equals("resetState")) { if (args.length > 0) {
switch (args[0]) {
case "resetState":
PacketThreading.hasTriggered = false; PacketThreading.hasTriggered = false;
PacketThreading.clearCnt = 0; PacketThreading.clearCnt = 0;
} return;
case "toggleThreadingStatus":
GeneralConfig.enablePacketThreading = !GeneralConfig.enablePacketThreading; // Force toggle.
PacketThreading.init(); // Reinit threads.
sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GREEN + "Packet sending status toggled to " + GeneralConfig.enablePacketThreading + "."));
return;
case "forceLock":
PacketThreading.lock.lock(); // oh my fucking god never do this please unless you really have to
sender.addChatMessage(new ChatComponentText(EnumChatFormatting.RED + "Packet thread lock acquired, this may freeze the main thread!"));
MainRegistry.logger.error("Packet thread lock acquired by {}, this may freeze the main thread!", sender.getCommandSenderName());
return;
case "forceUnlock":
PacketThreading.lock.unlock();
MainRegistry.logger.warn("Packet thread lock released by {}.", sender.getCommandSenderName());
return;
case "info":
sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GOLD + "NTM Packet Debugger v1.2")); sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GOLD + "NTM Packet Debugger v1.2"));
if (PacketThreading.isTriggered() && GeneralConfig.enablePacketThreading) if (PacketThreading.isTriggered() && GeneralConfig.enablePacketThreading)
@ -48,7 +66,7 @@ public class CommandPacketInfo extends CommandBase {
sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "# Threads (maximum): " + PacketThreading.threadPool.getMaximumPoolSize())); sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "# Threads (maximum): " + PacketThreading.threadPool.getMaximumPoolSize()));
for (ThreadInfo thread : ManagementFactory.getThreadMXBean().dumpAllThreads(false, false)) for (ThreadInfo thread : ManagementFactory.getThreadMXBean().dumpAllThreads(false, false))
if(thread.getThreadName().startsWith("NTM-Packet-Thread-")) { if (thread.getThreadName().startsWith(PacketThreading.threadPrefix)) {
sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GOLD + "Thread Name: " + thread.getThreadName())); sender.addChatMessage(new ChatComponentText(EnumChatFormatting.GOLD + "Thread Name: " + thread.getThreadName()));
sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Thread ID: " + thread.getThreadId())); sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Thread ID: " + thread.getThreadId()));
sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Thread state: " + thread.getThreadState())); sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Thread state: " + thread.getThreadState()));
@ -62,7 +80,10 @@ public class CommandPacketInfo extends CommandBase {
if (totalCnt != 0) if (totalCnt != 0)
sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "% Remaining to process: " + BobMathUtil.roundDecimal(((double) PacketThreading.threadPool.getQueue().size() / totalCnt) * 100, 2) + "%")); sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "% Remaining to process: " + BobMathUtil.roundDecimal(((double) PacketThreading.threadPool.getQueue().size() / totalCnt) * 100, 2) + "%"));
sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Time spent waiting on thread(s) last tick: " + BobMathUtil.roundDecimal(PacketThreading.nanoTimeWaited / 1000000d, 4) + "ms")); sender.addChatMessage(new ChatComponentText(EnumChatFormatting.YELLOW + "Time spent waiting on thread(s) last tick: " + BobMathUtil.roundDecimal(TimeUnit.NANOSECONDS.convert(PacketThreading.nanoTimeWaited, TimeUnit.MILLISECONDS), 4) + "ms"));
return;
}
}
sender.addChatMessage(new ChatComponentText(getCommandUsage(sender)));
} }
} }

View File

@ -11,10 +11,13 @@ import cpw.mods.fml.common.network.simpleimpl.IMessage;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
public class PacketThreading { public class PacketThreading {
private static final ThreadFactory packetThreadFactory = new ThreadFactoryBuilder().setNameFormat("NTM-Packet-Thread-%d").build(); public static final String threadPrefix = "NTM-Packet-Thread-";
public static final ThreadFactory packetThreadFactory = new ThreadFactoryBuilder().setNameFormat(threadPrefix + "%d").build();
public static final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1, packetThreadFactory); public static final ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1, packetThreadFactory);
@ -24,6 +27,8 @@ public class PacketThreading {
public static final List<Future<?>> futureList = new ArrayList<>(); public static final List<Future<?>> futureList = new ArrayList<>();
public static ReentrantLock lock = new ReentrantLock();
/** /**
* Sets up thread pool settings during mod initialization. * Sets up thread pool settings during mod initialization.
*/ */
@ -45,9 +50,15 @@ public class PacketThreading {
threadPool.allowCoreThreadTimeOut(false); threadPool.allowCoreThreadTimeOut(false);
} else { } else {
threadPool.allowCoreThreadTimeOut(true); threadPool.allowCoreThreadTimeOut(true);
try {
lock.lock();
for (Runnable task : threadPool.getQueue()) { for (Runnable task : threadPool.getQueue()) {
task.run(); // Run all tasks async just in-case there *are* tasks left to run. task.run(); // Run all tasks async just in-case there *are* tasks left to run.
} }
clearThreadPoolTasks();
} finally {
lock.unlock();
}
} }
} }
@ -56,8 +67,7 @@ public class PacketThreading {
* @param message Message to process. * @param message Message to process.
* @param target TargetPoint to send to. * @param target TargetPoint to send to.
*/ */
public static void createThreadedPacket(IMessage message, TargetPoint target) { public static void createAllAroundThreadedPacket(IMessage message, TargetPoint target) {
// `message` can be precompiled or not. // `message` can be precompiled or not.
if(message instanceof PrecompiledPacket) if(message instanceof PrecompiledPacket)
((PrecompiledPacket) message).getPreBuf(); // Gets the precompiled buffer, doing nothing if it already exists. ((PrecompiledPacket) message).getPreBuf(); // Gets the precompiled buffer, doing nothing if it already exists.
@ -65,17 +75,26 @@ public class PacketThreading {
totalCnt++; totalCnt++;
Runnable task = () -> { Runnable task = () -> {
try {
lock.lock();
PacketDispatcher.wrapper.sendToAllAround(message, target); PacketDispatcher.wrapper.sendToAllAround(message, target);
if (message instanceof PrecompiledPacket) if (message instanceof PrecompiledPacket)
((PrecompiledPacket) message).getPreBuf().release(); ((PrecompiledPacket) message).getPreBuf().release();
} finally {
lock.unlock();
}
}; };
addTask(task);
}
private static void addTask(Runnable task) {
if(isTriggered()) if(isTriggered())
task.run(); task.run();
else if(GeneralConfig.enablePacketThreading) else if(GeneralConfig.enablePacketThreading)
futureList.add(threadPool.submit(task)); // Thread it futureList.add(threadPool.submit(task));
else else
task.run(); // no threading :( task.run();
} }
/** /**
@ -86,7 +105,9 @@ public class PacketThreading {
try { try {
if (GeneralConfig.enablePacketThreading && (!GeneralConfig.packetThreadingErrorBypass && !hasTriggered)) { if (GeneralConfig.enablePacketThreading && (!GeneralConfig.packetThreadingErrorBypass && !hasTriggered)) {
for (Future<?> future : futureList) { for (Future<?> future : futureList) {
nanoTimeWaited = System.nanoTime() - startTime;
future.get(50, TimeUnit.MILLISECONDS); // I HATE EVERYTHING future.get(50, TimeUnit.MILLISECONDS); // I HATE EVERYTHING
if(TimeUnit.NANOSECONDS.convert(nanoTimeWaited, TimeUnit.MILLISECONDS) > 50) throw new TimeoutException(); // >50ms total time? timeout? yes sir, ooh rah!
} }
} }
} catch (ExecutionException ignored) { } catch (ExecutionException ignored) {
@ -99,7 +120,6 @@ public class PacketThreading {
Thread.currentThread().interrupt(); // maybe not the best thing but it's gotta be here Thread.currentThread().interrupt(); // maybe not the best thing but it's gotta be here
} finally { } finally {
futureList.clear(); futureList.clear();
nanoTimeWaited = System.nanoTime() - startTime;
if(!threadPool.getQueue().isEmpty()) { if(!threadPool.getQueue().isEmpty()) {
if(!GeneralConfig.packetThreadingErrorBypass && !hasTriggered) if(!GeneralConfig.packetThreadingErrorBypass && !hasTriggered)
MainRegistry.logger.warn("Residual packets in packet threading queue detected, discarding {}/{} packets.", threadPool.getQueue().size(), totalCnt); MainRegistry.logger.warn("Residual packets in packet threading queue detected, discarding {}/{} packets.", threadPool.getQueue().size(), totalCnt);
@ -128,7 +148,6 @@ public class PacketThreading {
clearCnt++; clearCnt++;
if(clearCnt > 5 && !isTriggered()) { if(clearCnt > 5 && !isTriggered()) {
// If it's been cleared 5 times in a row, something may have gone really wrong. // If it's been cleared 5 times in a row, something may have gone really wrong.
// Best case scenario here, the server is lagging terribly, has a bad CPU, or has a poor network connection // Best case scenario here, the server is lagging terribly, has a bad CPU, or has a poor network connection

View File

@ -936,6 +936,7 @@ public class MainRegistry {
event.registerServerCommand(new CommandDebugChunkLoad()); event.registerServerCommand(new CommandDebugChunkLoad());
event.registerServerCommand(new CommandSatellites()); event.registerServerCommand(new CommandSatellites());
event.registerServerCommand(new CommandRadiation()); event.registerServerCommand(new CommandRadiation());
event.registerServerCommand(new CommandPacketInfo());
} }
@EventHandler @EventHandler

View File

@ -1,5 +1,6 @@
package com.hbm.main; package com.hbm.main;
import com.hbm.handler.threading.PacketThreading;
import com.hbm.packet.PrecompiledPacket; import com.hbm.packet.PrecompiledPacket;
import cpw.mods.fml.common.network.FMLEmbeddedChannel; import cpw.mods.fml.common.network.FMLEmbeddedChannel;
import cpw.mods.fml.common.network.FMLOutboundHandler; import cpw.mods.fml.common.network.FMLOutboundHandler;
@ -20,6 +21,9 @@ import io.netty.handler.codec.CodecException;
import io.netty.handler.codec.MessageToMessageCodec; import io.netty.handler.codec.MessageToMessageCodec;
import net.minecraft.entity.player.EntityPlayerMP; import net.minecraft.entity.player.EntityPlayerMP;
import javax.management.MXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.ref.WeakReference; import java.lang.ref.WeakReference;
import java.util.EnumMap; import java.util.EnumMap;
import java.util.List; import java.util.List;
@ -124,37 +128,91 @@ public class NetworkHandler {
serverChannel.flush(); serverChannel.flush();
} }
public void sendToServer(IMessage message) { public void sendToServer(IMessage message) { // No thread protection needed here, since the client never threads packets to the server.
clientChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.TOSERVER); clientChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.TOSERVER);
clientChannel.write(message); clientChannel.write(message);
} }
public void sendToDimension(IMessage message, int dimensionId) { public void sendToDimension(IMessage message, int dimensionId) {
if(!Thread.currentThread().getName().contains(PacketThreading.threadPrefix)) {
try {
PacketThreading.lock.lock();
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.DIMENSION);
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(dimensionId);
serverChannel.write(message);
} finally {
PacketThreading.lock.unlock();
}
} else {
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.DIMENSION); serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.DIMENSION);
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(dimensionId); serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(dimensionId);
serverChannel.write(message); serverChannel.write(message);
} }
}
public void sendToAllAround(IMessage message, NetworkRegistry.TargetPoint point) { public void sendToAllAround(IMessage message, NetworkRegistry.TargetPoint point) {
if(!Thread.currentThread().getName().contains(PacketThreading.threadPrefix)) {
try {
PacketThreading.lock.lock();
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALLAROUNDPOINT); serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALLAROUNDPOINT);
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(point); serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(point);
serverChannel.write(message); serverChannel.write(message);
} finally {
PacketThreading.lock.unlock();
}
} else {
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALLAROUNDPOINT);
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(point);
serverChannel.write(message);
}
} }
public void sendToAllAround(ByteBuf message, NetworkRegistry.TargetPoint point) { public void sendToAllAround(ByteBuf message, NetworkRegistry.TargetPoint point) {
if(!Thread.currentThread().getName().contains(PacketThreading.threadPrefix)) {
try {
PacketThreading.lock.lock();
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALLAROUNDPOINT);
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(point);
serverChannel.write(message);
} finally {
PacketThreading.lock.unlock();
}
} else {
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALLAROUNDPOINT); serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALLAROUNDPOINT);
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(point); serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(point);
serverChannel.write(message); serverChannel.write(message);
} }
}
public void sendTo(IMessage message, EntityPlayerMP player) { public void sendTo(IMessage message, EntityPlayerMP player) {
if(!Thread.currentThread().getName().contains(PacketThreading.threadPrefix)) {
try {
PacketThreading.lock.lock();
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.PLAYER);
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(player);
serverChannel.write(message);
} finally {
PacketThreading.lock.unlock();
}
} else {
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.PLAYER); serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.PLAYER);
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(player); serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGETARGS).set(player);
serverChannel.write(message); serverChannel.write(message);
} }
}
public void sendToAll(IMessage message) { public void sendToAll(IMessage message) {
if(!Thread.currentThread().getName().contains(PacketThreading.threadPrefix)) {
try {
PacketThreading.lock.lock();
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALL);
serverChannel.write(message);
} finally {
PacketThreading.lock.unlock();
}
} else {
serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALL); serverChannel.attr(FMLOutboundHandler.FML_MESSAGETARGET).set(FMLOutboundHandler.OutboundTarget.ALL);
serverChannel.write(message); serverChannel.write(message);
} }
} }
}

View File

@ -86,7 +86,7 @@ public class TileEntityLoadedBase extends TileEntity implements ILoadedTile, IBu
this.lastPackedBuf = preBuf.copy(); this.lastPackedBuf = preBuf.copy();
PacketThreading.createThreadedPacket(packet, new NetworkRegistry.TargetPoint(this.worldObj.provider.dimensionId, xCoord, yCoord, zCoord, range)); PacketThreading.createAllAroundThreadedPacket(packet, new NetworkRegistry.TargetPoint(this.worldObj.provider.dimensionId, xCoord, yCoord, zCoord, range));
} }
} }