001/*
002 *  Copyright 2018 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.plugins.extraction.execution.pipeline;
017
018import java.io.File;
019import java.io.IOException;
020import java.nio.file.Files;
021import java.nio.file.Path;
022import java.time.Instant;
023import java.time.temporal.ChronoUnit;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.Map.Entry;
029import java.util.function.Function;
030import java.util.stream.Stream;
031
032import org.apache.avalon.framework.activity.Disposable;
033import org.apache.avalon.framework.activity.Initializable;
034import org.apache.avalon.framework.component.Component;
035import org.apache.avalon.framework.configuration.Configuration;
036import org.apache.avalon.framework.configuration.ConfigurationException;
037import org.apache.avalon.framework.configuration.DefaultConfigurationBuilder;
038import org.apache.avalon.framework.container.ContainerUtil;
039import org.apache.avalon.framework.service.ServiceException;
040import org.apache.avalon.framework.service.ServiceManager;
041import org.apache.avalon.framework.service.Serviceable;
042import org.apache.commons.lang3.StringUtils;
043import org.apache.commons.lang3.tuple.Pair;
044import org.apache.excalibur.source.SourceResolver;
045import org.apache.excalibur.source.impl.FileSource;
046import org.xml.sax.SAXException;
047
048import org.ametys.core.cache.AbstractCacheManager;
049import org.ametys.core.cache.Cache;
050import org.ametys.core.ui.Callable;
051import org.ametys.core.util.LambdaUtils;
052import org.ametys.core.util.URIUtils;
053import org.ametys.plugins.extraction.ExtractionConstants;
054import org.ametys.plugins.extraction.execution.pipeline.impl.ConfigurablePipelineDescriptor;
055import org.ametys.plugins.extraction.execution.pipeline.impl.NoOpPipelineDescriptor;
056import org.ametys.runtime.i18n.I18nizableText;
057import org.ametys.runtime.plugin.component.AbstractLogEnabled;
058
059/**
060 * Manager of {@link PipelineDescriptor}s
061 */
062public class PipelineManager extends AbstractLogEnabled implements Component, Serviceable, Initializable, Disposable
063{
064    /** The Avalon role. */
065    public static final String ROLE = PipelineManager.class.getName();
066    
067    // '*' char is appended as it is a forbidden character for file names, it ensures the uniqueness of the id
068    private static final String __NO_OP_PIPELINE_ID = NoOpPipelineDescriptor.class.getName() + "*";
069    
070    private static final String __PIPELINE_DESCRIPTOR_CACHE = "plugin-extraction.PipelineDescriptor";
071    
072    private PipelineDescriptor _noOpPipelineDesc;
073    private Map<String, Long> _lastReading = new HashMap<>();
074
075    private AbstractCacheManager _cacheManager;
076    private SourceResolver _resolver;
077    private PipelineSerializerModelExtensionPoint _serializerEP;
078
079    
080    @Override
081    public void service(ServiceManager manager) throws ServiceException
082    {
083        _cacheManager = (AbstractCacheManager) manager.lookup(AbstractCacheManager.ROLE);
084        _resolver = (SourceResolver) manager.lookup(SourceResolver.ROLE);
085        _serializerEP = (PipelineSerializerModelExtensionPoint) manager.lookup(PipelineSerializerModelExtensionPoint.ROLE);
086    }
087    
088    @Override
089    public void initialize() throws Exception
090    {
091        PipelineSerializerModel xmlSerializer = _serializerEP.getExtension("xml");
092        _noOpPipelineDesc = new NoOpPipelineDescriptor(xmlSerializer);
093        
094        _cacheManager.createMemoryCache(__PIPELINE_DESCRIPTOR_CACHE,
095                new I18nizableText("plugin.extraction", "PLUGINS_EXTRACTION_CACHE_PIPELINE_DESCRIPTOR_LABEL"),
096                new I18nizableText("plugin.extraction", "PLUGINS_EXTRACTION_CACHE_PIPELINE_DESCRIPTOR_DESCRIPTION"),
097                true,
098                null);
099    }
100    
101    @Override
102    public void dispose()
103    {
104        _getPipelineDescriptorCache().invalidateAll();
105        _lastReading.clear();
106    }
107    
108    /**
109     * Returns <code>true</code> if the {@link PipelineDescriptor} for asked path exists
110     * @param path The path of the {@link PipelineDescriptor}
111     * @return <code>true</code> if the asked pipeline exists
112     * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files
113     */
114    public boolean has(String path) throws IOException
115    {
116        return get(path) != null; 
117    }
118    
119    /**
120     * Gets the {@link PipelineDescriptor} for the given path
121     * @param path The path of the pipeline
122     * @return the request {@link PipelineDescriptor}, or null if not found
123     * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files
124     */
125    public PipelineDescriptor get(Path path) throws IOException
126    {
127        return get(path.toString());
128    }
129    
130    /**
131     * Gets the {@link PipelineDescriptor} for the given path
132     * @param path The path of the pipeline
133     * @return the request {@link PipelineDescriptor}, or null if not found
134     * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files
135     */
136    public PipelineDescriptor get(String path) throws IOException
137    {
138        if (__NO_OP_PIPELINE_ID.equals(path))
139        {
140            return _noOpPipelineDesc;
141        }
142        return _readAndGetPipeline(path);
143    }
144    
145    /**
146     * Gets the available pipelines for the given extraction
147     * @param extractionId the extraction's definition file name
148     * @return the available pipelines
149     * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files
150     */
151    @Callable (rights = "Extraction_Rights_ExecuteExtraction")
152    public Map<String, Object> getAvailablePipelines(String extractionId) throws IOException
153    {
154        if (extractionId != null)
155        {
156            String decodedExtractionId = URIUtils.decode(extractionId);
157            List<Map<String, Object>> enumeration = _getJsonEnumeration(URIUtils.decode(decodedExtractionId));
158            return Map.of("pipelines", enumeration);
159        }
160           
161        return Map.of();
162    }
163    
164    private List<Map<String, Object>> _getJsonEnumeration(String extractionId) throws IOException
165    {
166        List<Map<String, Object>> enumeration = new ArrayList<>();
167        _readAll();
168        
169        for (Entry<String , PipelineDescriptor> entry : _getPipelineDescriptorCache().asMap().entrySet())
170        {
171            PipelineDescriptor pipeline = entry.getValue();
172            ExtractionMatcher matcher = pipeline.getExtractionMatcher();
173            if (matcher.isHandled(extractionId))
174            {
175                enumeration.add(Map.of(
176                        "value", entry.getKey(), 
177                        "text", pipeline.getLabel()));
178            }
179        }
180        
181        return enumeration;
182    }
183    
184    /**
185     * Gets the id of the default {@link PipelineDescriptor}
186     * @return the id of the default {@link PipelineDescriptor}
187     */
188    public String getDefaultPipeline()
189    {
190        return __NO_OP_PIPELINE_ID;
191    }
192    
193    private synchronized PipelineDescriptor _readAndGetPipeline(String path) throws IOException
194    {
195        Path pipelinePath = _absolutePath(path);
196        if (Files.exists(pipelinePath))
197        {
198            Pair<String, PipelineDescriptor> readPipeline = _readPipeline(pipelinePath);
199            if (readPipeline != null)
200            {
201                PipelineDescriptor pipeline = readPipeline.getRight();
202                String id = readPipeline.getLeft();
203                _getPipelineDescriptorCache().put(id, pipeline);
204                return pipeline;
205            }
206        }
207        return null;
208    }
209    
210    private synchronized void _readAll() throws IOException
211    {
212        _readFromFolder(_absolutePath(StringUtils.EMPTY))
213            .forEach(pair -> _getPipelineDescriptorCache().put(pair.getKey(), pair.getValue()));
214        
215        // Add the default pipeline
216        _getPipelineDescriptorCache().put(__NO_OP_PIPELINE_ID, _noOpPipelineDesc);
217    }
218    
219    private Path _absolutePath(String relativePath) throws IOException
220    {
221        FileSource pipelinesDirScr = null;
222        FileSource src = null;
223        try
224        {
225            pipelinesDirScr = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR);
226            pipelinesDirScr.getFile().mkdirs();
227            
228            src = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR + relativePath);
229            File file = src.getFile();
230            return file.toPath();
231        }
232        finally
233        {
234            _resolver.release(pipelinesDirScr);
235            _resolver.release(src);
236        }
237    }
238    
239    private Stream<Pair<String, PipelineDescriptor>> _readFromFolder(Path folderPath) throws IOException
240    {
241        return Files.list(folderPath)
242            .map(LambdaUtils.wrap(this::_readFromFile))
243            .flatMap(Function.identity());
244    }
245    
246    private Stream<Pair<String, PipelineDescriptor>> _readFromFile(Path path) throws IOException
247    {
248        return Files.isDirectory(path) ? _readFromFolder(path) : Stream.ofNullable(_readPipeline(path));
249    }
250    
251    private Pair<String, PipelineDescriptor> _readPipeline(Path pipelinePath) throws IOException
252    {
253        String id = _getPipelineIdFromAbsolutePath(pipelinePath);
254        File pipelineFile = pipelinePath.toFile();
255        long lastModified = (pipelineFile.lastModified() / 1000) * 1000; // second precision for Linux file systems 
256        if (lastModified > _lastReading.getOrDefault(id, -1L))
257        {
258            // cache is out of date => need to read
259            try
260            {
261                Configuration conf = new DefaultConfigurationBuilder().buildFromFile(pipelineFile);
262                PipelineDescriptor pipeline = new ConfigurablePipelineDescriptor(id, _resolver, _serializerEP);
263                ContainerUtil.configure(pipeline, conf);
264                return Pair.of(id, pipeline);
265            }
266            catch (ConfigurationException | SAXException | IOException e)
267            {
268                getLogger().error("File '{}' could not be parsed as an extraction pipeline.", pipelinePath, e);
269                return null;
270            }
271            finally
272            {
273                _lastReading.put(id, _now());
274            }
275        }
276        else
277        {
278            // cache is up to date
279            PipelineDescriptor pipeline = _getPipelineDescriptorCache().get(id);
280            return pipeline == null ? null : Pair.of(id, pipeline);
281        }
282    }
283    
284    private String _getPipelineIdFromAbsolutePath(Path pipelineAbsolutePath) throws IOException
285    {
286        FileSource pipelinesDirScr = null;
287        try
288        {
289            pipelinesDirScr = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR);
290            File pipelinesFile = pipelinesDirScr.getFile();
291            Path pipelineRelativePath = pipelinesFile.toPath().relativize(pipelineAbsolutePath);
292            return pipelineRelativePath.toString();
293        }
294        finally
295        {
296            _resolver.release(pipelinesDirScr);
297        }
298    }
299    
300    private static Long _now()
301    {
302        return Instant.now().truncatedTo(ChronoUnit.SECONDS).toEpochMilli();
303    }
304    
305    private Cache<String, PipelineDescriptor> _getPipelineDescriptorCache()
306    {
307        return _cacheManager.get(__PIPELINE_DESCRIPTOR_CACHE);
308    }
309}