77
88import java .util .Objects ;
99import java .util .Queue ;
10- import java .util .concurrent .ArrayBlockingQueue ;
1110import java .util .concurrent .atomic .AtomicBoolean ;
1211import java .util .function .Consumer ;
1312import java .util .logging .Level ;
1413import java .util .logging .Logger ;
1514import org .jctools .queues .MessagePassingQueue ;
1615import org .jctools .queues .MpscArrayQueue ;
16+ import org .jctools .queues .atomic .MpscAtomicArrayQueue ;
1717
1818/**
1919 * Internal accessor of JCTools package for fast queues.
@@ -25,11 +25,15 @@ public final class JcTools {
2525
2626 private static final AtomicBoolean queueCreationWarningLogged = new AtomicBoolean ();
2727 private static final Logger logger = Logger .getLogger (JcTools .class .getName ());
28+ private static final boolean PROACTIVELY_AVOID_UNSAFE = proactivelyAvoidUnsafe ();
2829
2930 /**
3031 * Returns a new {@link Queue} appropriate for use with multiple producers and a single consumer.
3132 */
3233 public static <T > Queue <T > newFixedSizeQueue (int capacity ) {
34+ if (PROACTIVELY_AVOID_UNSAFE ) {
35+ return new MpscAtomicArrayQueue <>(capacity );
36+ }
3337 try {
3438 return new MpscArrayQueue <>(capacity );
3539 } catch (java .lang .NoClassDefFoundError | java .lang .ExceptionInInitializerError e ) {
@@ -41,7 +45,7 @@ public static <T> Queue<T> newFixedSizeQueue(int capacity) {
4145 }
4246 // Happens when modules such as jdk.unsupported are disabled in a custom JRE distribution,
4347 // or a security manager preventing access to Unsafe is installed.
44- return new ArrayBlockingQueue <>(capacity );
48+ return new MpscAtomicArrayQueue <>(capacity );
4549 }
4650 }
4751
@@ -50,11 +54,7 @@ public static <T> Queue<T> newFixedSizeQueue(int capacity) {
5054 * to use the shaded classes.
5155 */
5256 public static long capacity (Queue <?> queue ) {
53- if (queue instanceof MessagePassingQueue ) {
54- return ((MessagePassingQueue <?>) queue ).capacity ();
55- } else {
56- return (long ) ((ArrayBlockingQueue <?>) queue ).remainingCapacity () + queue .size ();
57- }
57+ return ((MessagePassingQueue <?>) queue ).capacity ();
5858 }
5959
6060 /**
@@ -65,22 +65,26 @@ public static long capacity(Queue<?> queue) {
6565 */
6666 @ SuppressWarnings ("unchecked" )
6767 public static <T > int drain (Queue <T > queue , int limit , Consumer <T > consumer ) {
68- if (queue instanceof MessagePassingQueue ) {
69- return ((MessagePassingQueue <T >) queue ).drain (consumer ::accept , limit );
70- } else {
71- return drainNonJcQueue (queue , limit , consumer );
72- }
68+ return ((MessagePassingQueue <T >) queue ).drain (consumer ::accept , limit );
7369 }
7470
75- private static <T > int drainNonJcQueue (
76- Queue <T > queue , int maxExportBatchSize , Consumer <T > consumer ) {
77- int polledCount = 0 ;
78- T item ;
79- while (polledCount < maxExportBatchSize && (item = queue .poll ()) != null ) {
80- consumer .accept (item );
81- ++polledCount ;
71+ private static boolean proactivelyAvoidUnsafe () {
72+ double javaVersion = getJavaVersion ();
73+ // Avoid Unsafe on Java 23+ due to JEP-498 deprecation warnings:
74+ // "WARNING: A terminally deprecated method in sun.misc.Unsafe has been called"
75+ return javaVersion >= 23 || javaVersion == -1 ;
76+ }
77+
78+ private static double getJavaVersion () {
79+ String specVersion = System .getProperty ("java.specification.version" );
80+ if (specVersion != null ) {
81+ try {
82+ return Double .parseDouble (specVersion );
83+ } catch (NumberFormatException exception ) {
84+ // ignore
85+ }
8286 }
83- return polledCount ;
87+ return - 1 ;
8488 }
8589
8690 private JcTools () {}
0 commit comments