Contextcontext=vertx.getOrCreateContext();if(context.isEventLoopContext()){System.out.println("Context attached to Event Loop");}elseif(context.isWorkerContext()){System.out.println("Context attached to Worker Thread");}elseif(context.isMultiThreadedWorkerContext()){System.out.println("Context attached to Worker Thread - multi threaded worker");}elseif(!Context.isOnVertxThread()){System.out.println("Context not attached to a thread managed by vert.x");}
if (ctx == null) {
// We are running embedded - Create a context
ctx = createEventLoopContext(null, null, new JsonObject(),
Thread.currentThread().getContextClassLoader());
}
@Override
@SuppressWarnings("unchecked")
public <T> T get(String key) {
return (T) contextData().get(key);
}
@Override
public void put(String key, Object value) {
contextData().put(key, value);
}
@Override
public boolean remove(String key) {
return contextData().remove(key) != null;
}
public synchronized ConcurrentMap<Object, Object> contextData() {
if (contextData == null) {
contextData = new ConcurrentHashMap<>();
}
return contextData;
}
<T> void executeBlocking(Action<T> action, Handler<Future<T>> blockingCodeHandler,
Handler<AsyncResult<T>> resultHandler,
Executor exec, TaskQueue queue, PoolMetrics metrics) {
Object queueMetric = metrics != null ? metrics.submitted() : null;
try {
Runnable command = () -> {
VertxThread current = (VertxThread) Thread.currentThread();
Object execMetric = null;
if (metrics != null) {
execMetric = metrics.begin(queueMetric);
}
if (!DISABLE_TIMINGS) {
current.executeStart();
}
Future<T> res = Future.future();
try {
if (blockingCodeHandler != null) {
ContextImpl.setContext(this);
blockingCodeHandler.handle(res);
} else {
T result = action.perform();
res.complete(result);
}
} catch (Throwable e) {
res.fail(e);
} finally {
if (!DISABLE_TIMINGS) {
current.executeEnd();
}
}
if (metrics != null) {
metrics.end(execMetric, res.succeeded());
}
if (resultHandler != null) {
runOnContext(v -> res.setHandler(resultHandler));
}
};
if (queue != null) {
queue.execute(command, exec);
} else {
exec.execute(command);
}
} catch (RejectedExecutionException e) {
// Pool is already shut down
if (metrics != null) {
metrics.rejected(queueMetric);
}
throw e;
}
}