Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions jpos/src/main/java/org/jpos/iso/ISOSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,23 @@ void send(ISOMsg m)
* @return true if source is connected and usable
*/
boolean isConnected();

/**
* If this ISOSource is connected, this returns true right away. Otherwise, it waits the specified timeout for connection.
*
* @param timeout the time to wait for a connection, in ms
* @return If the ISOSource connected during the specified timeout
*/
default boolean isConnected(long timeout) {
if (isConnected()) return true;
long end = System.nanoTime() + timeout * 1_000_000L;
long sleep = Math.min(500, timeout);
while (sleep > 0 && !Thread.currentThread().isInterrupted()) { // Honor interruptions.
ISOUtil.sleep(sleep);
if (isConnected()) return true;
sleep = Math.min(500, (end - System.nanoTime())/1_000_000L);
}
return false;
}

}
71 changes: 69 additions & 2 deletions jpos/src/main/java/org/jpos/q2/iso/MUXPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand Down Expand Up @@ -290,10 +291,33 @@ public boolean isConnected() {
return false;
}

@Override
public boolean isConnected(long timeout) {
if (isConnected()) return true;
if (timeout <= 0) return isConnected();

CompletableFuture<Boolean> result = new CompletableFuture<>();
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (MUX m : mux) {
executor.execute(() -> {
if (isUsable(m, timeout, executor)) result.complete(true);
});
}
try {
return result.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException | TimeoutException _) {
} finally {
executor.shutdownNow();
}
}
return false;
}

@SuppressWarnings("unchecked")
private boolean isUsable (MUX mux) {
if (!checkEnabled || !(mux instanceof QMUX))
return mux.isConnected();
if (!checkEnabled || !(mux instanceof QMUX)) return mux.isConnected();

QMUX qmux = (QMUX) mux;
String enabledKey = qmux.getName() + ".enabled";
Expand All @@ -305,6 +329,49 @@ private boolean isUsable (MUX mux) {
return mux.isConnected() && sp.rdp (enabledKey) != null;
}

@SuppressWarnings("unchecked")
private boolean isUsable(MUX mux, long timeout, ExecutorService executor) {
if (!checkEnabled || !(mux instanceof QMUX qmux)) return mux.isConnected(timeout);

long remaining = timeout;
long end = System.nanoTime() + timeout * 1_000_000L;
String enabledKey = qmux.getName() + ".enabled";
while (remaining > 0 && ! Thread.currentThread().isInterrupted()) { //Honor interruption
final long wait = remaining;
Future<Boolean> muxConnected = executor.submit(() -> mux.isConnected(wait));
Future<Boolean> enabled = executor.submit(() -> {
Object ready = sp.rd(enabledKey, wait);
String[] readyNames = qmux.getReadyIndicatorNames();
if (readyNames != null && readyNames.length == 1) {
// check that 'mux.enabled' entry has the same content as 'ready'
if (ready == sp.rdp (readyNames[0])) {
return true;
} else { // relax for some time and retry
ISOUtil.sleep(Math.clamp((end - System.nanoTime()) / 1_000_000L, 0L, 100L));
return ready == sp.rdp (readyNames[0]);
}
}
return ready != null;
});

try {
if (muxConnected.get(remaining, TimeUnit.MILLISECONDS)
&& enabled.get(remaining, TimeUnit.MILLISECONDS)
&& isUsable(mux)
) return true;
// if both conditions don't meet at the same time, try another round, provided there is time.
remaining = (end - System.nanoTime()) / 1_000_000L;
} catch (ExecutionException | TimeoutException _) {
// Last non-blocking check before failing
return isUsable(mux);
} catch (InterruptedException _) {
Thread.currentThread().interrupt();
return isUsable(mux);
}
}
return false;
}

private String[] toStringArray (String s) {
return (s != null && s.length() > 0) ? ISOUtil.toStringArray(s) : null;
}
Expand Down
31 changes: 28 additions & 3 deletions jpos/src/main/java/org/jpos/q2/iso/QMUX.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@
import java.io.IOException;
import java.io.PrintStream;
import java.util.*;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;

/**
* @author Alejandro Revilla
Expand Down Expand Up @@ -146,7 +145,7 @@ public void destroyService () {
public static MUX getMUX (String name)
throws NameRegistrar.NotFoundException
{
return (MUX) NameRegistrar.get ("mux."+name);
return NameRegistrar.get ("mux."+name);
}

/**
Expand Down Expand Up @@ -512,6 +511,32 @@ public boolean isConnected() {
}
return running();
}

@Override
public boolean isConnected(long timeout) {
if (isConnected()) return true;
if (timeout <= 0) return isConnected();
if (ready == null || ready.length == 0) return running();

CompletableFuture<Boolean> result = new CompletableFuture<>();
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (String r : ready) {
executor.execute(() -> {
if (sp.rd(r, timeout) != null)
result.complete(true);
});
}
try {
return result.get(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException | TimeoutException _) {
} finally {
executor.shutdownNow();
}
}
return false;
}
public void dump (PrintStream p, String indent) {
p.println (indent + getCountersAsString());
metrics.dump (p, indent);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,6 @@ else if (o instanceof Number)
}

protected boolean isConnected (MUX mux) {
if (!checkConnected || mux.isConnected())
return true;
long timeout = System.currentTimeMillis() + waitTimeout;
while (System.currentTimeMillis() < timeout) {
if (mux.isConnected())
return true;
ISOUtil.sleep (500);
}
return false;
return !checkConnected || mux.isConnected(waitTimeout);
}
}
Loading