001/*
002 *  Copyright 2018 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.plugins.extraction.execution.pipeline;
017
018import java.io.File;
019import java.io.IOException;
020import java.nio.file.Files;
021import java.nio.file.Path;
022import java.time.Instant;
023import java.time.temporal.ChronoUnit;
024import java.util.ArrayList;
025import java.util.HashMap;
026import java.util.List;
027import java.util.Map;
028import java.util.function.Function;
029import java.util.stream.Stream;
030
031import org.apache.avalon.framework.activity.Disposable;
032import org.apache.avalon.framework.activity.Initializable;
033import org.apache.avalon.framework.component.Component;
034import org.apache.avalon.framework.configuration.Configuration;
035import org.apache.avalon.framework.configuration.ConfigurationException;
036import org.apache.avalon.framework.configuration.DefaultConfigurationBuilder;
037import org.apache.avalon.framework.container.ContainerUtil;
038import org.apache.avalon.framework.service.ServiceException;
039import org.apache.avalon.framework.service.ServiceManager;
040import org.apache.avalon.framework.service.Serviceable;
041import org.apache.commons.lang3.StringUtils;
042import org.apache.commons.lang3.tuple.Pair;
043import org.apache.excalibur.source.SourceResolver;
044import org.apache.excalibur.source.impl.FileSource;
045import org.xml.sax.SAXException;
046
047import org.ametys.core.ui.Callable;
048import org.ametys.core.util.LambdaUtils;
049import org.ametys.core.util.URIUtils;
050import org.ametys.plugins.extraction.ExtractionConstants;
051import org.ametys.plugins.extraction.execution.pipeline.impl.ConfigurablePipelineDescriptor;
052import org.ametys.plugins.extraction.execution.pipeline.impl.NoOpPipelineDescriptor;
053import org.ametys.runtime.plugin.component.AbstractLogEnabled;
054
055/**
056 * Manager of {@link PipelineDescriptor}s
057 */
058public class PipelineManager extends AbstractLogEnabled implements Component, Serviceable, Initializable, Disposable
059{
060    /** The Avalon role. */
061    public static final String ROLE = PipelineManager.class.getName();
062    
063    // '*' char is appended as it is a forbidden character for file names, it ensures the uniqueness of the id
064    private static final String __NO_OP_PIPELINE_ID = NoOpPipelineDescriptor.class.getName() + "*";
065    
066    private PipelineDescriptor _noOpPipelineDesc;
067    private Map<String, PipelineDescriptor> _pipelineDescCache = new HashMap<>();
068    private Map<String, Long> _lastReading = new HashMap<>();
069
070    private SourceResolver _resolver;
071    private PipelineSerializerModelExtensionPoint _serializerEP;
072    
073    @Override
074    public void service(ServiceManager manager) throws ServiceException
075    {
076        _resolver = (SourceResolver) manager.lookup(SourceResolver.ROLE);
077        _serializerEP = (PipelineSerializerModelExtensionPoint) manager.lookup(PipelineSerializerModelExtensionPoint.ROLE);
078    }
079    
080    @Override
081    public void initialize() throws Exception
082    {
083        PipelineSerializerModel xmlSerializer = _serializerEP.getExtension("xml");
084        _noOpPipelineDesc = new NoOpPipelineDescriptor(xmlSerializer);
085    }
086    
087    @Override
088    public void dispose()
089    {
090        _pipelineDescCache.clear();
091        _lastReading.clear();
092    }
093    
094    /**
095     * Returns <code>true</code> if the {@link PipelineDescriptor} for asked path exists
096     * @param path The path of the {@link PipelineDescriptor}
097     * @return <code>true</code> if the asked pipeline exists
098     * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files
099     */
100    public boolean has(String path) throws IOException
101    {
102        return get(path) != null; 
103    }
104    
105    /**
106     * Gets the {@link PipelineDescriptor} for the given path
107     * @param path The path of the pipeline
108     * @return the request {@link PipelineDescriptor}, or null if not found
109     * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files
110     */
111    public PipelineDescriptor get(Path path) throws IOException
112    {
113        return get(path.toString());
114    }
115    
116    /**
117     * Gets the {@link PipelineDescriptor} for the given path
118     * @param path The path of the pipeline
119     * @return the request {@link PipelineDescriptor}, or null if not found
120     * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files
121     */
122    public PipelineDescriptor get(String path) throws IOException
123    {
124        if (__NO_OP_PIPELINE_ID.equals(path))
125        {
126            return _noOpPipelineDesc;
127        }
128        return _readAndGetPipeline(path);
129    }
130    
131    /**
132     * Gets the available pipelines for the given extraction
133     * @param extractionId the extraction's definition file name
134     * @return the available pipelines
135     * @throws IOException if an I/O exception occurred when accessing the root folder of pipeline files
136     */
137    @Callable (right = "Extraction_Rights_ExecuteExtraction")
138    public Map<String, Object> getAvailablePipelines(String extractionId) throws IOException
139    {
140        if (extractionId != null)
141        {
142            String decodedExtractionId = URIUtils.decode(extractionId);
143            List<Map<String, Object>> enumeration = _getJsonEnumeration(URIUtils.decode(decodedExtractionId));
144            return Map.of("pipelines", enumeration);
145        }
146           
147        return Map.of();
148    }
149    
150    private List<Map<String, Object>> _getJsonEnumeration(String extractionId) throws IOException
151    {
152        List<Map<String, Object>> enumeration = new ArrayList<>();
153        _readAll();
154        
155        for (String pipelineId : _pipelineDescCache.keySet())
156        {
157            PipelineDescriptor pipeline = _pipelineDescCache.get(pipelineId);
158            ExtractionMatcher matcher = pipeline.getExtractionMatcher();
159            if (matcher.isHandled(extractionId))
160            {
161                enumeration.add(Map.of(
162                        "value", pipelineId, 
163                        "text", pipeline.getLabel()));
164            }
165        }
166        
167        return enumeration;
168    }
169    
170    /**
171     * Gets the id of the default {@link PipelineDescriptor}
172     * @return the id of the default {@link PipelineDescriptor}
173     */
174    public String getDefaultPipeline()
175    {
176        return __NO_OP_PIPELINE_ID;
177    }
178    
179    private synchronized PipelineDescriptor _readAndGetPipeline(String path) throws IOException
180    {
181        Path pipelinePath = _absolutePath(path);
182        if (Files.exists(pipelinePath))
183        {
184            Pair<String, PipelineDescriptor> readPipeline = _readPipeline(pipelinePath);
185            if (readPipeline != null)
186            {
187                PipelineDescriptor pipeline = readPipeline.getRight();
188                String id = readPipeline.getLeft();
189                _pipelineDescCache.put(id, pipeline);
190                return pipeline;
191            }
192        }
193        return null;
194    }
195    
196    private synchronized void _readAll() throws IOException
197    {
198        _readFromFolder(_absolutePath(StringUtils.EMPTY))
199            .forEach(pair -> _pipelineDescCache.put(pair.getKey(), pair.getValue()));
200        
201        // Add the default pipeline
202        _pipelineDescCache.put(__NO_OP_PIPELINE_ID, _noOpPipelineDesc);
203    }
204    
205    private Path _absolutePath(String relativePath) throws IOException
206    {
207        FileSource pipelinesDirScr = null;
208        FileSource src = null;
209        try
210        {
211            pipelinesDirScr = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR);
212            pipelinesDirScr.getFile().mkdirs();
213            
214            src = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR + relativePath);
215            File file = src.getFile();
216            return file.toPath();
217        }
218        finally
219        {
220            _resolver.release(pipelinesDirScr);
221            _resolver.release(src);
222        }
223    }
224    
225    private Stream<Pair<String, PipelineDescriptor>> _readFromFolder(Path folderPath) throws IOException
226    {
227        return Files.list(folderPath)
228            .map(LambdaUtils.wrap(this::_readFromFile))
229            .flatMap(Function.identity());
230    }
231    
232    private Stream<Pair<String, PipelineDescriptor>> _readFromFile(Path path) throws IOException
233    {
234        return Files.isDirectory(path) ? _readFromFolder(path) : Stream.ofNullable(_readPipeline(path));
235    }
236    
237    private Pair<String, PipelineDescriptor> _readPipeline(Path pipelinePath) throws IOException
238    {
239        String id = _getPipelineIdFromAbsolutePath(pipelinePath);
240        File pipelineFile = pipelinePath.toFile();
241        long lastModified = (pipelineFile.lastModified() / 1000) * 1000; // second precision for Linux file systems 
242        if (lastModified > _lastReading.getOrDefault(id, -1L))
243        {
244            // cache is out of date => need to read
245            try
246            {
247                Configuration conf = new DefaultConfigurationBuilder().buildFromFile(pipelineFile);
248                PipelineDescriptor pipeline = new ConfigurablePipelineDescriptor(id, _resolver, _serializerEP);
249                ContainerUtil.configure(pipeline, conf);
250                return Pair.of(id, pipeline);
251            }
252            catch (ConfigurationException | SAXException | IOException e)
253            {
254                getLogger().error("File '{}' could not be parsed as an extraction pipeline.", pipelinePath, e);
255                return null;
256            }
257            finally
258            {
259                _lastReading.put(id, _now());
260            }
261        }
262        else
263        {
264            // cache is up to date
265            PipelineDescriptor pipeline = _pipelineDescCache.get(id);
266            return pipeline == null ? null : Pair.of(id, pipeline);
267        }
268    }
269    
270    private String _getPipelineIdFromAbsolutePath(Path pipelineAbsolutePath) throws IOException
271    {
272        FileSource pipelinesDirScr = null;
273        try
274        {
275            pipelinesDirScr = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR);
276            File pipelinesFile = pipelinesDirScr.getFile();
277            Path pipelineRelativePath = pipelinesFile.toPath().relativize(pipelineAbsolutePath);
278            return pipelineRelativePath.toString();
279        }
280        finally
281        {
282            _resolver.release(pipelinesDirScr);
283        }
284    }
285    
286    private static Long _now()
287    {
288        return Instant.now().truncatedTo(ChronoUnit.SECONDS).toEpochMilli();
289    }
290}