001/* 002 * Copyright 2013 Anyware Services 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016package org.ametys.core.observation; 017 018import java.util.ArrayList; 019import java.util.Collection; 020import java.util.Collections; 021import java.util.Comparator; 022import java.util.HashMap; 023import java.util.HashSet; 024import java.util.List; 025import java.util.Map; 026import java.util.Optional; 027import java.util.Set; 028import java.util.concurrent.Callable; 029import java.util.concurrent.ExecutorService; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.ThreadFactory; 033import java.util.concurrent.atomic.AtomicLong; 034 035import org.apache.avalon.framework.CascadingRuntimeException; 036import org.apache.avalon.framework.activity.Disposable; 037import org.apache.avalon.framework.component.Component; 038import org.apache.avalon.framework.context.ContextException; 039import org.apache.avalon.framework.context.Contextualizable; 040import org.apache.avalon.framework.logger.AbstractLogEnabled; 041import org.apache.avalon.framework.service.ServiceException; 042import org.apache.avalon.framework.service.ServiceManager; 043import org.apache.avalon.framework.service.Serviceable; 044import org.apache.cocoon.Constants; 045import org.apache.cocoon.components.ContextHelper; 046import org.apache.cocoon.environment.Context; 047import org.apache.cocoon.environment.ObjectModelHelper; 048import org.apache.cocoon.environment.Request; 049import org.slf4j.Logger; 050import org.slf4j.LoggerFactory; 051 052import org.ametys.core.authentication.AuthenticateAction; 053import org.ametys.core.engine.BackgroundEngineHelper; 054import org.ametys.core.engine.BackgroundEnvironment; 055import org.ametys.core.user.UserIdentity; 056import org.ametys.core.user.population.UserPopulationDAO; 057import org.ametys.plugins.core.user.UserDAO; 058 059/** 060 * Manager for dispatching {@link Event} instances to {@link Observer}s. 061 */ 062public class ObservationManager extends AbstractLogEnabled implements Component, Serviceable, Contextualizable, Disposable 063{ 064 /** 065 * Store a future and the traits of the observer that it will execute 066 * @param future the future 067 * @param traits the traits 068 */ 069 public record ObserverFuture(Future future, Set<String> traits) { } 070 071 /** Avalon ROLE. */ 072 public static final String ROLE = ObservationManager.class.getName(); 073 074 private static final String __CURRENT_FUTURES_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$currentFutures"; 075 private static final String __ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME = ObservationManager.class.getName() + "$.additional.args"; 076 private static final Logger __ALL_EVENTS_LOGGER = LoggerFactory.getLogger("org.ametys.cms.observation.AllEvents"); 077 078 /** 079 * The executor service managing the single thread pool. This threads is 080 * used to run non-parallelizable observers. 081 */ 082 private static ExecutorService __SINGLE_THREAD_EXECUTOR; 083 084 /** 085 * The executor service managing the thread pool of asynchronous observers 086 * allowed to run in parallel. 087 */ 088 private static ExecutorService __PARALLEL_THREAD_EXECUTOR; 089 090 /** Cocoon context */ 091 protected Context _context; 092 093 /** Service manager */ 094 protected ServiceManager _manager; 095 096 private org.apache.avalon.framework.context.Context _avalonContext; 097 private ObserverExtensionPoint _observerExtPt; 098 private Collection<Observer> _registeredObservers = new ArrayList<>(); 099 100 @Override 101 public void service(ServiceManager manager) throws ServiceException 102 { 103 _manager = manager; 104 _observerExtPt = (ObserverExtensionPoint) manager.lookup(ObserverExtensionPoint.ROLE); 105 } 106 107 @Override 108 public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException 109 { 110 _avalonContext = context; 111 _context = (org.apache.cocoon.environment.Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT); 112 } 113 114 /** 115 * Notify of a event which will be dispatch to registered 116 * observers. 117 * @param event the event to notify. 118 * @return The {@link Future} objects of the asynchronous observers 119 */ 120 public List<Future> notify(final Event event) 121 { 122 List<Future> result = new ArrayList<>(); 123 124 try 125 { 126 if (getLogger().isDebugEnabled()) 127 { 128 getLogger().debug("Receiving " + event); 129 } 130 131 if (__ALL_EVENTS_LOGGER.isInfoEnabled()) 132 { 133 __ALL_EVENTS_LOGGER.info("Receiving " + event); 134 } 135 136 List<Observer> supportedObservers = new ArrayList<>(); 137 138 // Retrieve supported observers from extension point 139 for (String observerId : _observerExtPt.getExtensionsIds()) 140 { 141 Observer observer = _observerExtPt.getExtension(observerId); 142 _addSupportedObserver(supportedObservers, observer, event); 143 } 144 145 // Retrieve supported observers from manual registration 146 for (Observer observer : _registeredObservers) 147 { 148 _addSupportedObserver(supportedObservers, observer, event); 149 } 150 151 // Order observers (0 is first, Integer.MAX_INT is last) 152 Collections.sort(supportedObservers, Comparator.comparingInt( 153 observer -> observer.getPriority(event) 154 )); 155 156 // Observes the event and prepares the asynchronous observes. 157 _putAdditionalArgs(event); 158 _observesEvent(event, supportedObservers, result); 159 } 160 catch (Exception e) 161 { 162 // Observers should never fail, so just log the error 163 getLogger().error("Unable to dispatch event: " + event + " to observers", e); 164 } 165 166 return result; 167 } 168 169 private void _addSupportedObserver(List<Observer> supportedObservers, Observer observer, final Event event) 170 { 171 if (getLogger().isDebugEnabled()) 172 { 173 getLogger().debug("Checking support for event: " + event + " and observer: " + observer); 174 } 175 176 if (observer.supports(event)) 177 { 178 if (getLogger().isDebugEnabled()) 179 { 180 getLogger().debug("Event: " + event + " supported for observer: " + observer); 181 } 182 183 supportedObservers.add(observer); 184 } 185 } 186 187 /** 188 * Gets all the {@link Future}s of all the {@link AsyncObserver}s notified by the call to {@link ObservationManager#notify(Event)} during the current request. 189 * <br>This can be useful to call {@link Future#get()} in order to ensure all {@link AsyncObserver}s notified by the current request have finished to work. 190 * @return the {@link Future}s of the notified observers during the current request. 191 */ 192 public List<ObserverFuture> getFuturesForRequest() 193 { 194 try 195 { 196 return Collections.unmodifiableList(_getFuturesForRequest()); 197 } 198 catch (CascadingRuntimeException e) 199 { 200 // can happen if no request 201 // return empty map 202 return List.of(); 203 } 204 } 205 206 private List<ObserverFuture> _getFuturesForRequest() throws CascadingRuntimeException 207 { 208 Request request = ContextHelper.getRequest(_avalonContext); 209 @SuppressWarnings("unchecked") 210 List<ObserverFuture> futuresForCurrentRequest = (List<ObserverFuture>) request.getAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME); 211 if (futuresForCurrentRequest == null) 212 { 213 futuresForCurrentRequest = new ArrayList<>(); 214 request.setAttribute(__CURRENT_FUTURES_REQUEST_ATTR_NAME, futuresForCurrentRequest); 215 } 216 return futuresForCurrentRequest; 217 } 218 219 /** 220 * For all events of the given event id which will be notified during the current request, add an additional argument 221 * @param eventIds The event ids 222 * @param argName The name of the additional argument 223 * @param argValue The value of the additional argument 224 */ 225 public void addArgumentForEvents(String[] eventIds, String argName, Object argValue) 226 { 227 Request request = ContextHelper.getRequest(_avalonContext); 228 229 // Map eventId -> argName -> argValue 230 @SuppressWarnings("unchecked") 231 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 232 if (additionalArgsForEvents == null) 233 { 234 additionalArgsForEvents = new HashMap<>(); 235 request.setAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME, additionalArgsForEvents); 236 } 237 238 for (String eventId : eventIds) 239 { 240 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 241 if (additionalArgs == null) 242 { 243 additionalArgs = new HashMap<>(); 244 additionalArgsForEvents.put(eventId, additionalArgs); 245 } 246 247 additionalArgs.put(argName, argValue); 248 } 249 } 250 251 /** 252 * For all events of the given event id which will be notified during the current request, removes the additional argument 253 * with the given name which was added with the {@link ObservationManager#addArgumentForEvents(String[], String, Object)} method. 254 * @param eventIds The event ids 255 * @param argName The name of the additional argument to remove 256 */ 257 public void removeArgumentForEvents(String[] eventIds, String argName) 258 { 259 Request request = ContextHelper.getRequest(_avalonContext); 260 261 // Map eventId -> argName -> argValue 262 @SuppressWarnings("unchecked") 263 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 264 if (additionalArgsForEvents == null) 265 { 266 return; 267 } 268 269 for (String eventId : eventIds) 270 { 271 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 272 if (additionalArgs == null) 273 { 274 break; 275 } 276 277 additionalArgs.remove(argName); 278 } 279 } 280 281 private void _putAdditionalArgs(final Event event) 282 { 283 String eventId = event.getId(); 284 285 Request request = null; 286 try 287 { 288 request = ContextHelper.getRequest(_avalonContext); 289 } 290 catch (CascadingRuntimeException e) 291 { 292 // can happen if no request 293 // return empty list 294 } 295 if (request != null) 296 { 297 @SuppressWarnings("unchecked") 298 Map<String, Map<String, Object>> additionalArgsForEvents = (Map<String, Map<String, Object>>) request.getAttribute(__ADDITIONAL_ARGS_TO_PASS_BY_EVENT_ID_REQUEST_ATTR_NAME); 299 if (additionalArgsForEvents != null) 300 { 301 Map<String, Object> additionalArgs = additionalArgsForEvents.get(eventId); 302 if (additionalArgs != null) 303 { 304 event.getArguments().putAll(additionalArgs); 305 } 306 } 307 } 308 } 309 310 /** 311 * Observes the event 312 * @param event The event 313 * @param supportedObservers list of observers 314 * @param result The result object to fill 315 * @throws Exception on error 316 */ 317 protected void _observesEvent(final Event event, List<Observer> supportedObservers, List<Future> result) throws Exception 318 { 319 List<AsyncObserver> parallelizableAsyncObservers = new ArrayList<>(); 320 List<AsyncObserver> nonParallelizableAsyncObservers = new ArrayList<>(); 321 322 Map<String, Object> transientVars = new HashMap<>(); 323 for (Observer supportedObserver : supportedObservers) 324 { 325 if (getLogger().isInfoEnabled()) 326 { 327 getLogger().info("Notifying observer: " + supportedObserver + " for event: " + event); 328 } 329 330 // Observe current event 331 if (supportedObserver instanceof AsyncObserver) 332 { 333 AsyncObserver asyncObserver = (AsyncObserver) supportedObserver; 334 boolean parallelizable = asyncObserver.parallelizable(); 335 336 if (getLogger().isDebugEnabled()) 337 { 338 getLogger().debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 339 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 340 } 341 if (__ALL_EVENTS_LOGGER.isDebugEnabled()) 342 { 343 __ALL_EVENTS_LOGGER.debug(String.format("Adding %s asynchronous observer: '%s' for event '%s' to the async queue.", 344 parallelizable ? "parallelizable" : "non-parallelizable", asyncObserver, event)); 345 } 346 347 if (parallelizable) 348 { 349 parallelizableAsyncObservers.add(asyncObserver); 350 } 351 else 352 { 353 nonParallelizableAsyncObservers.add(asyncObserver); 354 } 355 } 356 else 357 { 358 try 359 { 360 supportedObserver.observe(event, transientVars); 361 } 362 catch (Throwable e) 363 { 364 // Observation process should continue 365 getLogger().error("The synchronous observer '" + supportedObserver.getClass().getName() + "' failed to observe " + event, e); 366 } 367 } 368 } 369 370 // Observe for async observer. 371 if (!parallelizableAsyncObservers.isEmpty() || !nonParallelizableAsyncObservers.isEmpty()) 372 { 373 _asyncObserve(parallelizableAsyncObservers, nonParallelizableAsyncObservers, event, transientVars, result); 374 } 375 } 376 377 /** 378 * Async observe through a thread pool 379 * @param parallelObservers parallelizable asynchronous observers 380 * @param nonParallelObservers non parallelizable asynchronous observers 381 * @param event The event to observe 382 * @param transientVars The observer transient vars 383 * @param result The future results 384 */ 385 private void _asyncObserve(Collection<AsyncObserver> parallelObservers, List<AsyncObserver> nonParallelObservers, Event event, Map<String, Object> transientVars, List<Future> result) 386 { 387 // Keep in current request the futures of the curent request 388 List<ObserverFuture> futuresForCurrentRequest = null; 389 try 390 { 391 futuresForCurrentRequest = _getFuturesForRequest(); 392 } 393 catch (CascadingRuntimeException e) 394 { 395 // can happen if no request 396 // do nothing though 397 } 398 399 if (__SINGLE_THREAD_EXECUTOR == null) 400 { 401 AsyncObserveThreadFactory threadFactory = new AsyncObserveThreadFactory(); 402 403 __SINGLE_THREAD_EXECUTOR = Executors.newSingleThreadExecutor(threadFactory); 404 405 // 1 thread per core 406 __PARALLEL_THREAD_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), threadFactory); 407 } 408 409 if (!parallelObservers.isEmpty()) 410 { 411 for (AsyncObserver observer : parallelObservers) 412 { 413 Future future = __PARALLEL_THREAD_EXECUTOR.submit(new ParallelAsyncObserve(observer, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 414 result.add(future); 415 if (futuresForCurrentRequest != null) 416 { 417 futuresForCurrentRequest.add(new ObserverFuture(future, observer.getTraits())); 418 } 419 } 420 } 421 422 if (!nonParallelObservers.isEmpty()) 423 { 424 Future future = __SINGLE_THREAD_EXECUTOR.submit(new NonParallelAsyncObserve(nonParallelObservers, event, transientVars, getLogger(), __ALL_EVENTS_LOGGER)); 425 result.add(future); 426 if (futuresForCurrentRequest != null) 427 { 428 Set<String> traits = new HashSet<>(); 429 for (Observer observer : nonParallelObservers) 430 { 431 traits.addAll(observer.getTraits()); 432 } 433 futuresForCurrentRequest.add(new ObserverFuture(future, traits)); 434 } 435 } 436 } 437 438 /** 439 * Runnable to be used for asynchronous calls 440 */ 441 abstract class AbstractAsyncObserve implements Callable<Object> 442 { 443 /** event to observe */ 444 protected final Event _event; 445 protected final Map<String, Object> _transientVars; 446 protected final org.apache.avalon.framework.logger.Logger _logger; 447 protected final Logger _allEventLogger; 448 449 AbstractAsyncObserve(Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 450 { 451 _event = event; 452 _transientVars = transientVars; 453 _logger = logger; 454 _allEventLogger = allEventLogger; 455 } 456 457 @Override 458 public Object call() 459 { 460 Map<String, Object> environmentInformation = null; 461 Integer computedResult = 0; 462 try 463 { 464 // Create the environment. 465 environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(_manager, _context, _logger); 466 467 _setUserInSession(environmentInformation); 468 469 _observe(); 470 } 471 catch (Exception e) 472 { 473 // Observer must never fail, so just log the error 474 _logger.error("Unable to dispatch event: " + _event + " to asynchronous observers", e); 475 computedResult = -1; 476 } 477 finally 478 { 479 // FIXME close jcr-sessions? (as in JCRSessionDispatchRequestProcess) 480 481 BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation); 482 } 483 484 return computedResult; 485 } 486 487 private boolean _setUserInSession(Map<String, Object> environmentInformation) 488 { 489 UserIdentity userIdentity = Optional.ofNullable(_event.getIssuer()) 490 .orElse(UserPopulationDAO.SYSTEM_USER_IDENTITY); 491 492 BackgroundEnvironment bgEnv = (BackgroundEnvironment) environmentInformation.get("environment"); 493 Request request = ObjectModelHelper.getRequest(bgEnv.getObjectModel()); 494 495 AuthenticateAction.setUserIdentityInSession(request, userIdentity, new UserDAO.ImpersonateCredentialProvider(), true); 496 497 return true; 498 } 499 500 /** 501 * Abstract observe method where the observation should be done 502 * @throws Exception on error 503 */ 504 protected abstract void _observe() throws Exception; 505 } 506 507 /** 508 * Runnable for parallel observers 509 */ 510 class ParallelAsyncObserve extends AbstractAsyncObserve 511 { 512 private final AsyncObserver _observer; 513 514 ParallelAsyncObserve(AsyncObserver observer, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 515 { 516 super(event, transientVars, logger, allEventLogger); 517 _observer = observer; 518 } 519 520 @Override 521 protected void _observe() throws Exception 522 { 523 if (_logger.isDebugEnabled()) 524 { 525 _logger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 526 } 527 if (_allEventLogger.isDebugEnabled()) 528 { 529 _allEventLogger.debug("Observing the asynchronous observer: " + _observer + " for event: " + _event + "."); 530 } 531 532 _observer.observe(_event, _transientVars); 533 } 534 } 535 536 /** 537 * Runnable for non parallel observers 538 */ 539 class NonParallelAsyncObserve extends AbstractAsyncObserve 540 { 541 private final Collection<AsyncObserver> _observers; 542 543 NonParallelAsyncObserve(Collection<AsyncObserver> observers, Event event, Map<String, Object> transientVars, org.apache.avalon.framework.logger.Logger logger, Logger allEventLogger) 544 { 545 super(event, transientVars, logger, allEventLogger); 546 _observers = observers; 547 } 548 549 @Override 550 protected void _observe() throws Exception 551 { 552 for (AsyncObserver observer : _observers) 553 { 554 if (_logger.isDebugEnabled()) 555 { 556 _logger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 557 } 558 if (_allEventLogger.isDebugEnabled()) 559 { 560 _allEventLogger.debug("Observing the asynchronous observer: " + observer + " for event: " + _event + "."); 561 } 562 563 observer.observe(_event, _transientVars); 564 } 565 } 566 } 567 568 /** 569 * Thread factory for async observers. 570 * Set the thread name format and marks the thread as daemon. 571 */ 572 static class AsyncObserveThreadFactory implements ThreadFactory 573 { 574 private static ThreadFactory _defaultThreadFactory; 575 private static String _nameFormat; 576 private static AtomicLong _count; 577 578 public AsyncObserveThreadFactory() 579 { 580 _defaultThreadFactory = Executors.defaultThreadFactory(); 581 _nameFormat = "ametys-async-observe-%d"; 582 _count = new AtomicLong(0); 583 } 584 585 public Thread newThread(Runnable r) 586 { 587 Thread thread = _defaultThreadFactory.newThread(r); 588 thread.setName(String.format(_nameFormat, _count.getAndIncrement())); 589 thread.setDaemon(true); 590 591 return thread; 592 } 593 } 594 595 @Override 596 public void dispose() 597 { 598 if (__SINGLE_THREAD_EXECUTOR != null) 599 { 600 __SINGLE_THREAD_EXECUTOR.shutdownNow(); 601 __SINGLE_THREAD_EXECUTOR = null; 602 } 603 if (__PARALLEL_THREAD_EXECUTOR != null) 604 { 605 __PARALLEL_THREAD_EXECUTOR.shutdownNow(); 606 __PARALLEL_THREAD_EXECUTOR = null; 607 } 608 609 _context = null; 610 _manager = null; 611 } 612 613 /** 614 * Registers an {@link Observer}. 615 * @param observer the {@link Observer}. 616 */ 617 public void registerObserver(Observer observer) 618 { 619 _registeredObservers.add(observer); 620 } 621 622 /** 623 * Unregisters an {@link Observer}. 624 * @param observer the {@link Observer}. 625 */ 626 public void unregisterObserver(Observer observer) 627 { 628 _registeredObservers.remove(observer); 629 } 630}