001/* 002 * Copyright 2013 Anyware Services 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.ametys.core.observation; 017 018import java.util.ArrayList; 019import java.util.Collection; 020import java.util.Collections; 021import java.util.Comparator; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Optional; 027import java.util.Set; 028import java.util.concurrent.Callable; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.ThreadFactory; 033import java.util.concurrent.atomic.AtomicLong; 034 035import org.apache.avalon.framework.CascadingRuntimeException; 036import org.apache.avalon.framework.activity.Disposable; 037import org.apache.avalon.framework.component.Component; 038import org.apache.avalon.framework.context.ContextException; 039import org.apache.avalon.framework.context.Contextualizable; 040import org.apache.avalon.framework.logger.AbstractLogEnabled; 041import org.apache.avalon.framework.service.ServiceException; 042import org.apache.avalon.framework.service.ServiceManager; 043import org.apache.avalon.framework.service.Serviceable; 044import org.apache.cocoon.Constants; 045import org.apache.cocoon.components.ContextHelper; 046import org.apache.cocoon.environment.Context; 047import org.apache.cocoon.environment.ObjectModelHelper; 048import org.apache.cocoon.environment.Request; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.ametys.core.authentication.AuthenticateAction; 053import org.ametys.core.engine.BackgroundEngineHelper; 054import org.ametys.core.engine.BackgroundEnvironment; 055import org.ametys.core.user.UserIdentity; 056import org.ametys.core.user.population.UserPopulationDAO; 057import org.ametys.plugins.core.user.UserDAO; 058 059/** 060 * Manager for dispatching {@link Event} instances to {@link Observer}s. 061 */ 062public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable 063{ 064 /** 065 * Store a future and the traits of the observer that it will execute 066 * @param future the future 067 * @param traits the traits 068 */ 069 public record ObserverFuture(Future future, Set<String> traits) { } 070 071 /** Avalon ROLE. */ 072 public static final String ROLE = ObservationManager.class.getName(); 073 074 private static final String __CURRENT_FUTURES_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$currentFutures"; 075 private static final String __ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$.additional.args"; 076 private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents"); 077 078 private static final Comparator<Observer> __OBSERVER_COMPARATOR = Comparator.comparing(Observer::getPriority); 079 080 /** 081 * The executor service managing the single thread pool. This threads is 082 * used to run non-parallelizable observers. 083 */ 084 private static ExecutorService __SINGLE_THREAD_EXECUTOR; 085 086 /** 087 * The executor service managing the thread pool of asynchronous observers 088 * allowed to run in parallel. 089 */ 090 private static ExecutorService __PARALLEL_THREAD_EXECUTOR; 091 092 /** Cocoon context */ 093 protected Context _context; 094 095 /** Service manager */ 096 protected ServiceManager _manager; 097 098 private org.apache.avalon.framework.context.Context _avalonContext; 099 private ObserverExtensionPoint _observerExtPt; 100 private Collection<Observer> _registeredObservers = new ArrayList<>(); 101 102 @Override 103 public void service(ServiceManager manager) throws ServiceException 104 { 105 _manager = manager; 106 _observerExtPt = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE); 107 } 108 109 @Override 110 public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException 111 { 112 _avalonContext = context; 113 _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT); 114 } 115 116 /** 117 * Notify of a event which will be dispatch to registered 118 * observers. 119 * @param event the event to notify. 120 * @return The {@link Future} objects of the asynchronous observers 121 */ 122 public List<Future> notify(final Event event) 123 { 124 List<Future> result = new ArrayList<>(); 125 126 try 127 { 128 if (getLogger().isDebugEnabled()) 129 { 130 getLogger().debug("Receiving " + event); 131 } 132 133 if (__ALL_EVENTS_LOGGER.isInfoEnabled()) 134 { 135 __ALL_EVENTS_LOGGER.info("Receiving " + event); 136 } 137 138 // Add the comparator to keep the right order with observers from the extension point 139 // and manually registered observers 140 List<Observer> supportedObservers = new ArrayList<>(); 141 142 // Retrieve supported observers from extension point 143 supportedObservers.addAll(_observerExtPt.getSupportingExtensions(event)); 144 145 // Retrieve supported observers from manual registration 146 supportedObservers.addAll( 147 _registeredObservers.stream() 148 .filter(obs -> obs.supports(event)) 149 .toList() 150 ); 151 152 supportedObservers.sort(__OBSERVER_COMPARATOR); 153 154 // Observes the event and prepares the asynchronous observes. 155 _putAdditionalArgs(event); 156 _observesEvent(event, supportedObservers, result); 157 } 158 catch (Exception e) 159 { 160 // Observers should never fail, so just log the error 161 getLogger().error("Unable to dispatch event: " + event + " to observers", e); 162 } 163 164 return result; 165 } 166 167 /** 168 * Gets all the {@link Future}s of all the {@link AsyncObserver}s notified by the call to {@link ObservationManager#notify(Event)} during the current request. 169 * <br>This can be useful to call {@link Future#get()} in order to ensure all {@link AsyncObserver}s notified by the current request have finished to work. 170 * @return the {@link Future}s of the notified observers during the current request. 171 */ 172 public List<ObserverFuture> getFuturesForRequest() 173 { 174 try 175 { 176 return Collections.unmodifiableList(_getFuturesForRequest()); 177 } 178 catch (CascadingRuntimeException e) 179 { 180 // can happen if no request 181 // return empty map 182 return List.of(); 183 } 184 } 185 186 private List<ObserverFuture> _getFuturesForRequest() throws CascadingRuntimeException 187 { 188 Request request = ContextHelper.getRequest(_avalonContext); 189 @SuppressWarnings("unchecked") 190 List<ObserverFuture> futuresForCurrentRequest = (List<ObserverFuture>) request.getAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME); 191 if (futuresForCurrentRequest == null) 192 { 193 futuresForCurrentRequest = new ArrayList<>(); 194 request.setAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME, futuresForCurrentRequest); 195 } 196 return futuresForCurrentRequest; 197 } 198 199 /** 200 * For all events of the given event id which will be notified during the current request, add an additional argument 201 * @param eventIds The event ids 202 * @param argName The name of the additional argument 203 * @param argValue The value of the additional argument 204 */ 205 public void addArgumentForEvents(String[] eventIds, String argName, Object argValue) 206 { 207 Request request = ContextHelper.getRequest(_avalonContext); 208 209 // Map eventId -> argName -> argValue 210 @SuppressWarnings("unchecked") 211 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 212 if (additionalArgsForEvents == null) 213 { 214 additionalArgsForEvents = new HashMap<>(); 215 request.setAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME, additionalArgsForEvents); 216 } 217 218 for (String eventId : eventIds) 219 { 220 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 221 if (additionalArgs == null) 222 { 223 additionalArgs = new HashMap<>(); 224 additionalArgsForEvents.put(eventId, additionalArgs); 225 } 226 227 additionalArgs.put(argName, argValue); 228 } 229 } 230 231 /** 232 * For all events of the given event id which will be notified during the current request, removes the additional argument 233 * with the given name which was added with the {@link ObservationManager#addArgumentForEvents(String[], String, Object)} method. 234 * @param eventIds The event ids 235 * @param argName The name of the additional argument to remove 236 */ 237 public void removeArgumentForEvents(String[] eventIds, String argName) 238 { 239 Request request = ContextHelper.getRequest(_avalonContext); 240 241 // Map eventId -> argName -> argValue 242 @SuppressWarnings("unchecked") 243 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 244 if (additionalArgsForEvents == null) 245 { 246 return; 247 } 248 249 for (String eventId : eventIds) 250 { 251 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 252 if (additionalArgs == null) 253 { 254 break; 255 } 256 257 additionalArgs.remove(argName); 258 } 259 } 260 261 private void _putAdditionalArgs(final Event event) 262 { 263 String eventId = event.getId(); 264 265 Request request = null; 266 try 267 { 268 request = ContextHelper.getRequest(_avalonContext); 269 } 270 catch (CascadingRuntimeException e) 271 { 272 // can happen if no request 273 // return empty list 274 } 275 if (request != null) 276 { 277 @SuppressWarnings("unchecked") 278 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 279 if (additionalArgsForEvents != null) 280 { 281 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 282 if (additionalArgs != null) 283 { 284 event.getArguments().putAll(additionalArgs); 285 } 286 } 287 } 288 } 289 290 /** 291 * Observes the event 292 * @param event The event 293 * @param supportedObservers list of observers 294 * @param result The result object to fill 295 * @throws Exception on error 296 */ 297 protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception 298 { 299 List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>(); 300 List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>(); 301 302 Map<String, Object> transientVars = new HashMap<>(); 303 for (Observer supportedObserver : supportedObservers) 304 { 305 if (getLogger().isInfoEnabled()) 306 { 307 getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event); 308 } 309 310 // Observe current event 311 if (supportedObserver instanceof AsyncObserver) 312 { 313 AsyncObserver asyncObserver = (AsyncObserver) supportedObserver; 314 boolean parallelizable = asyncObserver.parallelizable(); 315 316 if (getLogger().isDebugEnabled()) 317 { 318 getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 319 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 320 } 321 if (__ALL_EVENTS_LOGGER.isDebugEnabled()) 322 { 323 __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 324 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 325 } 326 327 if (parallelizable) 328 { 329 parallelizableAsyncObservers.add(asyncObserver); 330 } 331 else 332 { 333 nonParallelizableAsyncObservers.add(asyncObserver); 334 } 335 } 336 else 337 { 338 try 339 { 340 supportedObserver.observe(event, transientVars); 341 } 342 catch (Throwable e) 343 { 344 // Observation process should continue 345 getLogger().error("The synchronous observer '" + supportedObserver.getClass().getName() + "' failed to observe " + event, e); 346 } 347 } 348 } 349 350 // Observe for async observer. 351 if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty()) 352 { 353 _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result); 354 } 355 } 356 357 /** 358 * Async observe through a thread pool 359 * @param parallelObservers parallelizable asynchronous observers 360 * @param nonParallelObservers non parallelizable asynchronous observers 361 * @param event The event to observe 362 * @param transientVars The observer transient vars 363 * @param result The future results 364 */ 365 private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result) 366 { 367 // Keep in current request the futures of the curent request 368 List<ObserverFuture> futuresForCurrentRequest = null; 369 try 370 { 371 futuresForCurrentRequest = _getFuturesForRequest(); 372 } 373 catch (CascadingRuntimeException e) 374 { 375 // can happen if no request 376 // do nothing though 377 } 378 379 if (__SINGLE_THREAD_EXECUTOR == null) 380 { 381 AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory(); 382 383 __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory); 384 385 // 1 thread per core 386 __PARALLEL_THREAD_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory); 387 } 388 389 if (!parallelObservers.isEmpty()) 390 { 391 for (AsyncObserver observer : parallelObservers) 392 { 393 Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 394 result.add(future); 395 if (futuresForCurrentRequest != null) 396 { 397 futuresForCurrentRequest.add(new ObserverFuture(future, observer.getTraits())); 398 } 399 } 400 } 401 402 if (!nonParallelObservers.isEmpty()) 403 { 404 Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 405 result.add(future); 406 if (futuresForCurrentRequest != null) 407 { 408 Set<String> traits = new HashSet<>(); 409 for (Observer observer : nonParallelObservers) 410 { 411 traits.addAll(observer.getTraits()); 412 } 413 futuresForCurrentRequest.add(new ObserverFuture(future, traits)); 414 } 415 } 416 } 417 418 /** 419 * Runnable to be used for asynchronous calls 420 */ 421 abstract class AbstractAsyncObserve implements Callable<Object> 422 { 423 /** event to observe */ 424 protected final Event _event; 425 protected final Map<String, Object> _transientVars; 426 protected final org.apache.avalon.framework.logger.Logger _logger; 427 protected final Logger _allEventLogger; 428 429 AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 430 { 431 _event = event; 432 _transientVars = transientVars; 433 _logger = logger; 434 _allEventLogger = allEventLogger; 435 } 436 437 @Override 438 public Object call() 439 { 440 Map<String, Object> environmentInformation = null; 441 Integer computedResult = 0; 442 try 443 { 444 // Create the environment. 445 environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger); 446 447 _setUserInSession(environmentInformation); 448 449 _observe(); 450 } 451 catch (Exception e) 452 { 453 // Observer must never fail, so just log the error 454 _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e); 455 computedResult = -1; 456 } 457 finally 458 { 459 // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess) 460 461 BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation); 462 } 463 464 return computedResult; 465 } 466 467 private boolean _setUserInSession(Map<String, Object> environmentInformation) 468 { 469 UserIdentity userIdentity = Optional.ofNullable(_event.getIssuer()) 470 .orElse(UserPopulationDAO.SYSTEM_USER_IDENTITY); 471 472 BackgroundEnvironment bgEnv = (BackgroundEnvironment) environmentInformation.get("environment"); 473 Request request = ObjectModelHelper.getRequest(bgEnv.getObjectModel()); 474 475 AuthenticateAction.setUserIdentityInSession(request, userIdentity, new UserDAO.ImpersonateCredentialProvider(), true); 476 477 return true; 478 } 479 480 /** 481 * Abstract observe method where the observation should be done 482 * @throws Exception on error 483 */ 484 protected abstract void _observe() throws Exception; 485 } 486 487 /** 488 * Runnable for parallel observers 489 */ 490 class ParallelAsyncObserve extends AbstractAsyncObserve 491 { 492 private final AsyncObserver _observer; 493 494 ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 495 { 496 super(event, transientVars, logger, allEventLogger); 497 _observer = observer; 498 } 499 500 @Override 501 protected void _observe() throws Exception 502 { 503 if (_logger.isDebugEnabled()) 504 { 505 _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 506 } 507 if (_allEventLogger.isDebugEnabled()) 508 { 509 _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 510 } 511 512 _observer.observe(_event, _transientVars); 513 } 514 } 515 516 /** 517 * Runnable for non parallel observers 518 */ 519 class NonParallelAsyncObserve extends AbstractAsyncObserve 520 { 521 private final Collection<AsyncObserver> _observers; 522 523 NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 524 { 525 super(event, transientVars, logger, allEventLogger); 526 _observers = observers; 527 } 528 529 @Override 530 protected void _observe() throws Exception 531 { 532 for (AsyncObserver observer : _observers) 533 { 534 if (_logger.isDebugEnabled()) 535 { 536 _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 537 } 538 if (_allEventLogger.isDebugEnabled()) 539 { 540 _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 541 } 542 543 observer.observe(_event, _transientVars); 544 } 545 } 546 } 547 548 /** 549 * Thread factory for async observers. 550 * Set the thread name format and marks the thread as daemon. 551 */ 552 static class AsyncObserveThreadFactory implements ThreadFactory 553 { 554 private ThreadFactory _defaultThreadFactory; 555 private String _nameFormat; 556 private AtomicLong _count; 557 558 public AsyncObserveThreadFactory() 559 { 560 _defaultThreadFactory = Executors.defaultThreadFactory(); 561 _nameFormat = "ametys-async-observe-%d"; 562 _count = new AtomicLong(0); 563 } 564 565 public Thread newThread(Runnable r) 566 { 567 Thread thread = _defaultThreadFactory.newThread(r); 568 thread.setName(String.format(_nameFormat, _count.getAndIncrement())); 569 thread.setDaemon(true); 570 571 return thread; 572 } 573 } 574 575 @Override 576 public void dispose() 577 { 578 if (__SINGLE_THREAD_EXECUTOR != null) 579 { 580 __SINGLE_THREAD_EXECUTOR.shutdownNow(); 581 __SINGLE_THREAD_EXECUTOR = null; 582 } 583 if (__PARALLEL_THREAD_EXECUTOR != null) 584 { 585 __PARALLEL_THREAD_EXECUTOR.shutdownNow(); 586 __PARALLEL_THREAD_EXECUTOR = null; 587 } 588 589 _context = null; 590 _manager = null; 591 } 592 593 /** 594 * Registers an {@link Observer}. 595 * @param observer the {@link Observer}. 596 */ 597 public void registerObserver(Observer observer) 598 { 599 _registeredObservers.add(observer); 600 } 601 602 /** 603 * Unregisters an {@link Observer}. 604 * @param observer the {@link Observer}. 605 */ 606 public void unregisterObserver(Observer observer) 607 { 608 _registeredObservers.remove(observer); 609 } 610}