001/*
002 *  Copyright 2010 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.cms.content.consistency;
017
018import java.time.ZonedDateTime;
019import java.util.ArrayList;
020import java.util.HashMap;
021import java.util.Iterator;
022import java.util.List;
023import java.util.Map;
024import java.util.Optional;
025import java.util.concurrent.Callable;
026import java.util.concurrent.CancellationException;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.Executor;
029import java.util.concurrent.ExecutorService;
030import java.util.concurrent.Executors;
031import java.util.concurrent.Future;
032import java.util.concurrent.ThreadFactory;
033import java.util.concurrent.atomic.AtomicLong;
034
035import javax.jcr.RepositoryException;
036
037import org.apache.avalon.framework.activity.Initializable;
038import org.apache.avalon.framework.component.Component;
039import org.apache.avalon.framework.context.ContextException;
040import org.apache.avalon.framework.context.Contextualizable;
041import org.apache.avalon.framework.logger.Logger;
042import org.apache.avalon.framework.service.ServiceException;
043import org.apache.avalon.framework.service.ServiceManager;
044import org.apache.avalon.framework.service.Serviceable;
045import org.apache.cocoon.Constants;
046import org.apache.cocoon.environment.Context;
047import org.apache.cocoon.util.log.SLF4JLoggerAdapter;
048
049import org.ametys.cms.content.references.OutgoingReferences;
050import org.ametys.cms.repository.Content;
051import org.ametys.cms.repository.ContentQueryHelper;
052import org.ametys.cms.repository.DefaultContent;
053import org.ametys.cms.repository.WorkflowAwareContent;
054import org.ametys.cms.transformation.ConsistencyChecker;
055import org.ametys.cms.transformation.ConsistencyChecker.CHECK;
056import org.ametys.core.engine.BackgroundEngineHelper;
057import org.ametys.core.util.DateUtils;
058import org.ametys.plugins.repository.AmetysObjectIterable;
059import org.ametys.plugins.repository.AmetysObjectResolver;
060import org.ametys.plugins.repository.AmetysRepositoryException;
061import org.ametys.plugins.repository.ModifiableAmetysObject;
062import org.ametys.plugins.repository.ModifiableTraversableAmetysObject;
063import org.ametys.plugins.repository.RepositoryConstants;
064import org.ametys.plugins.repository.query.QueryHelper;
065import org.ametys.plugins.repository.query.expression.AndExpression;
066import org.ametys.plugins.repository.query.expression.Expression;
067import org.ametys.plugins.repository.query.expression.Expression.Operator;
068import org.ametys.plugins.repository.query.expression.StringExpression;
069import org.ametys.plugins.repositoryapp.RepositoryProvider;
070import org.ametys.runtime.plugin.component.AbstractLogEnabled;
071
072/**
073 * Manage all operation related to checking the consistency of a content
074 */
075public class ContentConsistencyManager extends AbstractLogEnabled implements Initializable, Contextualizable, Serviceable, Component
076{
077
078    /** The avalon role */
079    public static final String ROLE = ContentConsistencyManager.class.getName();
080    
081    private static ExecutorService __PARALLEL_THREAD_EXECUTOR;
082    private static final int __THREAD_POOL_SIZE_MULTIPLIER = 4;
083    
084    /** The ametys object resolver. */
085    protected AmetysObjectResolver _resolver;
086    
087    /** The consistency checker */
088    protected ConsistencyChecker _consistencyChecker;
089    
090    private Context _cocoonContext;
091    private RepositoryProvider _repositoryProvider;
092    private ServiceManager _manager;
093
094    public void contextualize(org.apache.avalon.framework.context.Context context) throws ContextException
095    {
096        _cocoonContext = (Context) context.get(Constants.CONTEXT_ENVIRONMENT_CONTEXT);
097    }
098    
099    public void service(ServiceManager manager) throws ServiceException
100    {
101        _manager = manager;
102        _consistencyChecker = (ConsistencyChecker) manager.lookup(ConsistencyChecker.ROLE);
103        _resolver = (AmetysObjectResolver) manager.lookup(AmetysObjectResolver.ROLE);
104        _repositoryProvider = (RepositoryProvider) manager.lookup(RepositoryProvider.ROLE);
105    }
106
107    public void initialize() throws Exception
108    {
109        AsyncConsistencyCheckerThreadFactory threadFactory = new AsyncConsistencyCheckerThreadFactory();
110        // The thread are doing a lot of heavy IO operations, it's worth going over the number of available processors
111        __PARALLEL_THREAD_EXECUTOR = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * __THREAD_POOL_SIZE_MULTIPLIER, threadFactory);
112    }
113    
114    /**
115     * Thread factory for async checker.
116     * Set the thread name format and marks the thread as daemon. 
117     */
118    static class AsyncConsistencyCheckerThreadFactory implements ThreadFactory
119    {
120        private static ThreadFactory _defaultThreadFactory;
121        private static String _nameFormat;
122        private static AtomicLong _count;
123        
124        public AsyncConsistencyCheckerThreadFactory()
125        {
126            _defaultThreadFactory = Executors.defaultThreadFactory();
127            _nameFormat = "ametys-async-consistency-checker-%d";
128            _count = new AtomicLong(0);
129        }
130        
131        public Thread newThread(Runnable r)
132        {
133            Thread thread = _defaultThreadFactory.newThread(r);
134            thread.setName(String.format(_nameFormat, _count.getAndIncrement()));
135            // make the threads low priority daemon to avoid slowing user thread
136            thread.setDaemon(true);
137            thread.setPriority(3);
138            
139            return thread;
140        }
141    }
142    
143    /**
144     * Getter to provide synthetic access to the manager
145     */
146    private ServiceManager getManager()
147    {
148        return _manager;
149    }
150    
151    /**
152     * Runnable to be used for asynchronous calls 
153     */
154    class AsyncConsistencyChecker implements Callable<String>
155    {
156        /** event to observe */
157        protected final Logger _logger;
158        private final Content _content;
159        
160        public AsyncConsistencyChecker(Content content, org.slf4j.Logger logger)
161        {
162            this._logger = new SLF4JLoggerAdapter(logger);
163            this._content = content;
164        }
165        
166        @Override
167        public String call()
168        {
169            Map<String, Object> environmentInformation = null;
170            try
171            {
172                // Create the environment.
173                environmentInformation = BackgroundEngineHelper.createAndEnterEngineEnvironment(getManager(), _cocoonContext, _logger);
174                
175                return _checkConsistency(_content);
176            }
177            catch (Exception e)
178            {
179                throw new RuntimeException("Content consistency check for content " + _content.getId() + " failed", e);
180            }
181            finally
182            {
183                BackgroundEngineHelper.leaveEngineEnvironment(environmentInformation);
184            }
185        }
186    }
187    
188    /**
189     * Check all contents to see if there references are consistent.
190     * All result will also be stored in the consistency result database table for later access.
191     * @return record describing the results of the checks
192     */
193    public ConsistenciesReport checkAllContents() 
194    {
195        // Get the start date to remove outdated results later
196        ZonedDateTime startDate = ZonedDateTime.now();
197        try (AmetysObjectIterable<Content> contents = _getContents(null))
198        {
199            ConsistenciesReport checkReport = _checkContents(contents);
200            // the checkContents only remove possible outdated results for the set of content provided as arguments.
201            // We need to remove all the other result : deleted content, deleted link, etc…
202            removeOutdatedResult(startDate, null);
203            return checkReport;
204        }
205    }
206    
207    /**
208     * Remove results older than a given date
209     * @param date the threshold
210     * @param filterExpression an expression to filter the content to consider, null to consider all contents
211     */
212    protected void removeOutdatedResult(ZonedDateTime date, Expression filterExpression)
213    {
214        String xPathQuery = QueryHelper.getXPathQuery(null, "ametys:consistencyResult", filterExpression);
215        xPathQuery += "[@ametys:" + ContentConsistencyResult.DATE + " < xs:dateTime('" + DateUtils.zonedDateTimeToString(date) + "')]";
216        try (AmetysObjectIterable<ContentConsistencyResult> outdatedResults = _resolver.query(xPathQuery))
217        {
218            for (ContentConsistencyResult outdatedResult : outdatedResults)
219            {
220                try
221                {
222                    outdatedResult.remove();
223                    outdatedResult.saveChanges();
224                }
225                catch (AmetysRepositoryException e)
226                {
227                    getLogger().warn("Failed to remove outdated result '{}' due to repository error", outdatedResult.getId(), e);
228                }
229            }
230        }
231    }
232
233    /**
234     * Record holding the results of a consistency checks on multiple contents
235     * @param results a list of {@link ContentConsistencyResult} for every contents that were checked
236     * @param unchecked a list of content id for every content check that were interrupted, cancelled or failed during execution
237     */
238    public record ConsistenciesReport(List<String> results, List<String> unchecked) { /* Record */ }
239
240    /**
241     * Check all the content provided by the iterable for broken links.
242     * This method will actually provide {@link Callable} to an {@link Executor} that parallelize the checks.
243     * @param contents an iterable of contents to check
244     * @return record describing the results of the checks
245     */
246    protected ConsistenciesReport _checkContents(AmetysObjectIterable<Content> contents)
247    {
248        try 
249        {
250            List<AsyncConsistencyChecker> checkers = contents.stream()
251                .map(content -> new AsyncConsistencyChecker(content, getLogger()))
252                .toList();
253            
254            // execute all checker and wait for their completion (either success or failure)
255            List<Future<String>> futures = __PARALLEL_THREAD_EXECUTOR.invokeAll(checkers);
256            
257            // Refresh the session to retrieve the repository modification from the threads
258            _repositoryProvider.getSession("default").refresh(true);
259            
260            Iterator<Future<String>> fIterarot = futures.iterator();
261            Iterator<Content> cIterator = contents.iterator();
262            
263            List<String> done = new ArrayList<>();
264            List<String> failed = new ArrayList<>();
265            
266            // both iterator should have the same size as the future iterator was mapped from the content iterator
267            while (fIterarot.hasNext() && cIterator.hasNext())
268            {
269                Future<String> future = fIterarot.next();
270                Content content = cIterator.next();
271                
272                try
273                {
274                    String result = future.get();
275                    if (result != null)
276                    {
277                        done.add(result);
278                    }
279                }
280                catch (CancellationException | InterruptedException | ExecutionException e)
281                {
282                    String contentId = content.getId();
283                    getLogger().error("Failed to retrieve result from content consistency checker thread for content {}", contentId, e);
284                    failed.add(contentId);
285                }
286            }
287            return new ConsistenciesReport(done, failed);
288        }
289        catch (InterruptedException e)
290        {
291            getLogger().error("Content consistency check was interrupted", e);
292            return null;
293        }
294        catch (RepositoryException e1)
295        {
296            getLogger().error("Failed to refresh the session");
297            return null;
298        }
299    }
300    
301    private String _checkConsistency(Content content)
302    {
303        int successCount = 0;
304        int unknownCount = 0;
305        int unauthorizedCount = 0;
306        int notFoundCount = 0;
307        int serverErrorCount = 0;
308        
309        Map<String, OutgoingReferences> referencesByPath = content.getOutgoingReferences();
310        
311        for (String dataPath : referencesByPath.keySet())
312        {
313            OutgoingReferences references = referencesByPath.get(dataPath);
314            for (String referenceType : references.keySet())
315            {
316                for (String referenceValue : references.get(referenceType))
317                {
318                    CHECK check;
319                    try
320                    {
321                        check = _consistencyChecker.checkConsistency(referenceType, referenceValue, content.getId(), dataPath, false).status();
322                    }
323                    catch (Exception e)
324                    {
325                        check = CHECK.SERVER_ERROR;
326                        getLogger().debug("An exception occurred while checking reference value {} at dataPath {} for content {}", referenceType + "#" + referenceValue, dataPath, content.getId(), e);
327                    }
328                    
329                    switch (check)
330                    {
331                        case SUCCESS:
332                            successCount++;
333                            break;
334                        case UNKNOWN:
335                            unknownCount++;
336                            break;
337                        case UNAUTHORIZED:
338                            unauthorizedCount++;
339                            break;
340                        case NOT_FOUND:
341                            notFoundCount++;
342                            break;
343                        case SERVER_ERROR:
344                        default:
345                            serverErrorCount++;
346                            break;
347                    }
348                }
349            }
350        }
351        
352        // Store the result
353        ContentConsistencyResult result = storeResult((WorkflowAwareContent) content, successCount, unknownCount, unauthorizedCount, notFoundCount, serverErrorCount);
354        
355        return result != null ? result.getId() : null;
356    }
357
358    /**
359     * Store the result of the consistency check done on the content
360     * @param content the content checked
361     * @param successCount the number of success
362     * @param unknownCount the number of unknown
363     * @param unauthorizedCount the number of unauthorized
364     * @param notFoundCount the number of not found
365     * @param serverErrorCount the number of server error
366     * @return the result
367     */
368    protected ContentConsistencyResult storeResult(WorkflowAwareContent content, int successCount, int unknownCount, int unauthorizedCount, int notFoundCount, int serverErrorCount)
369    {
370        // Retrieve a pre existing result for the content or create an new one
371        Optional<ContentConsistencyResult> previousResult = _getExistingResultForContent(content.getId());
372        if (unknownCount > 0 || unauthorizedCount > 0 || serverErrorCount > 0 || notFoundCount > 0)
373        {
374            ContentConsistencyResult result = previousResult.orElseGet(() -> _getResultsCollection().createChild("consistency-" + content.getName(), "ametys:consistencyResult"));
375            
376            Map<String, Object> values = new HashMap<>(12);
377            values.put(ContentConsistencyResult.CONTENT_ID, content.getId());
378            values.put(ContentConsistencyResult.TITLE, content.getTitle());
379            values.put(ContentConsistencyResult.CONTENT_TYPES, content.getTypes());
380            values.put(ContentConsistencyResult.LAST_CONTRIBUTOR, content.getLastContributor());
381            values.put(ContentConsistencyResult.WORKFLOW_STEP, content.getCurrentStepId());
382            values.put(ContentConsistencyResult.DATE, ZonedDateTime.now());
383            values.put(ContentConsistencyResult.NOT_FOUND, notFoundCount);
384            values.put(ContentConsistencyResult.SERVER_ERROR, serverErrorCount);
385            values.put(ContentConsistencyResult.SUCCESS, successCount);
386            values.put(ContentConsistencyResult.UNAUTHORIZED, unauthorizedCount);
387            values.put(ContentConsistencyResult.UNKNOWN, unknownCount);
388            
389            result.synchronizeValues(values);
390            
391            result.saveChanges();
392            return result;
393        }
394        // Remove old result if there is no error any more
395        else if (previousResult.isPresent())
396        {
397            ContentConsistencyResult result = previousResult.get();
398            ModifiableAmetysObject parent = result.getParent();
399            result.remove();
400            parent.saveChanges();
401        }
402        return null;
403    }
404
405    private Optional<ContentConsistencyResult> _getExistingResultForContent(String id)
406    {
407        String xPathQuery = QueryHelper.getXPathQuery(null, "ametys:consistencyResult", new StringExpression(ContentConsistencyResult.CONTENT_ID, Operator.EQ, id));
408        try (AmetysObjectIterable<ContentConsistencyResult> query = _resolver.query(xPathQuery))
409        {
410            return query.stream().findFirst();
411        }
412    }
413
414    private ModifiableTraversableAmetysObject _getResultsCollection()
415    {
416        ModifiableTraversableAmetysObject pluginsRoot = _resolver.resolveByPath("/ametys:plugins");
417        
418        ModifiableTraversableAmetysObject cmsNode = null;
419        if (pluginsRoot.hasChild("cms"))
420        {
421            cmsNode = (ModifiableTraversableAmetysObject) pluginsRoot.getChild("cms");
422        }
423        else
424        {
425            cmsNode = (ModifiableTraversableAmetysObject) pluginsRoot.createChild("cms", "ametys:unstructured");
426        }
427        
428        ModifiableTraversableAmetysObject resultsCollection = null;
429        if (cmsNode.hasChild("consistencyResults"))
430        {
431            resultsCollection = (ModifiableTraversableAmetysObject) cmsNode.getChild("consistencyResults");
432        }
433        else
434        {
435            resultsCollection = (ModifiableTraversableAmetysObject) cmsNode.createChild("consistencyResults", "ametys:collection");
436        }
437        
438        return resultsCollection;
439    }
440    
441    /**
442     * Get the contents with inconsistency information.
443     * @param filterExpression an expression to filter content to check, or null to check all contents
444     * @return an iterator on contents.
445     */
446    protected AmetysObjectIterable<Content> _getContents(Expression filterExpression)
447    {
448        String query = ContentQueryHelper.getContentXPathQuery(filterExpression != null ? new AndExpression(new ConsistencyExpression(), filterExpression) : new ConsistencyExpression());
449        return _resolver.query(query);
450    }
451
452    /**
453     * Expression which tests if contents have consistency informations.
454     */
455    public static class ConsistencyExpression implements Expression
456    {
457        public String build()
458        {
459            return new StringBuilder()
460            .append(RepositoryConstants.NAMESPACE_PREFIX_INTERNAL).append(':').append(DefaultContent.METADATA_ROOT_OUTGOING_REFERENCES)
461            .append('/').append(RepositoryConstants.NAMESPACE_PREFIX_INTERNAL).append(':').append(DefaultContent.METADATA_OUTGOING_REFERENCES)
462            .append("/*")
463            .append("/@").append(RepositoryConstants.NAMESPACE_PREFIX).append(':').append(DefaultContent.METADATA_OUTGOING_REFERENCE_PROPERTY)
464            .toString();
465        }
466    }
467}