001/*
002 *  Copyright 2018 Anyware Services
003 *
004 *  Licensed under the Apache License, Version 2.0 (the "License");
005 *  you may not use this file except in compliance with the License.
006 *  You may obtain a copy of the License at
007 *
008 *      http://www.apache.org/licenses/LICENSE-2.0
009 *
010 *  Unless required by applicable law or agreed to in writing, software
011 *  distributed under the License is distributed on an "AS IS" BASIS,
012 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 *  See the License for the specific language governing permissions and
014 *  limitations under the License.
015 */
016package org.ametys.plugins.extraction.execution.pipeline;
017
018import java.io.File;
019import java.io.IOException;
020import java.nio.file.Files;
021import java.nio.file.Path;
022import java.time.Instant;
023import java.time.temporal.ChronoUnit;
024import java.util.HashMap;
025import java.util.List;
026import java.util.Map;
027import java.util.Objects;
028import java.util.stream.Collectors;
029
030import org.apache.avalon.framework.activity.Disposable;
031import org.apache.avalon.framework.activity.Initializable;
032import org.apache.avalon.framework.component.Component;
033import org.apache.avalon.framework.configuration.Configuration;
034import org.apache.avalon.framework.configuration.ConfigurationException;
035import org.apache.avalon.framework.configuration.DefaultConfigurationBuilder;
036import org.apache.avalon.framework.container.ContainerUtil;
037import org.apache.avalon.framework.service.ServiceException;
038import org.apache.avalon.framework.service.ServiceManager;
039import org.apache.avalon.framework.service.Serviceable;
040import org.apache.commons.lang3.tuple.Pair;
041import org.apache.excalibur.source.SourceResolver;
042import org.apache.excalibur.source.impl.FileSource;
043import org.xml.sax.SAXException;
044
045import org.ametys.plugins.extraction.ExtractionConstants;
046import org.ametys.plugins.extraction.execution.pipeline.impl.ConfigurablePipelineDescriptor;
047import org.ametys.plugins.extraction.execution.pipeline.impl.NoOpPipelineDescriptor;
048import org.ametys.runtime.i18n.I18nizableText;
049import org.ametys.runtime.plugin.component.AbstractLogEnabled;
050
051import com.google.common.collect.ImmutableMap;
052
053/**
054 * Manager of {@link PipelineDescriptor}s
055 */
056public class PipelineManager extends AbstractLogEnabled implements Component, Serviceable, Initializable, Disposable
057{
058    /** The Avalon role. */
059    public static final String ROLE = PipelineManager.class.getName();
060    
061    // '*' char is appended as it is a forbidden character for file names, it ensures the uniqueness of the id
062    private static final String __NO_OP_PIPELINE_ID = NoOpPipelineDescriptor.class.getName() + "*";
063    
064    private PipelineDescriptor _noOpPipelineDesc;
065    private Map<String, PipelineDescriptor> _pipelineDescCache = new HashMap<>();
066    private Map<String, Long> _lastReading = new HashMap<>();
067
068    private SourceResolver _resolver;
069    private PipelineSerializerModelExtensionPoint _serializerEP;
070    
071    @Override
072    public void service(ServiceManager manager) throws ServiceException
073    {
074        _resolver = (SourceResolver) manager.lookup(SourceResolver.ROLE);
075        _serializerEP = (PipelineSerializerModelExtensionPoint) manager.lookup(PipelineSerializerModelExtensionPoint.ROLE);
076    }
077    
078    @Override
079    public void initialize() throws Exception
080    {
081        PipelineSerializerModel xmlSerializer = _serializerEP.getExtension("xml");
082        _noOpPipelineDesc = new NoOpPipelineDescriptor(xmlSerializer);
083    }
084    
085    @Override
086    public void dispose()
087    {
088        _pipelineDescCache.clear();
089        _lastReading.clear();
090    }
091    
092    /**
093     * Returns <code>true</code> if the {@link PipelineDescriptor} for asked id exists
094     * @param id The id of the {@link PipelineDescriptor}
095     * @return <code>true</code> if the asked pipeline exists
096     * @throws IOException if an I/O exception occured when accessing the root folder of pipeline files
097     */
098    public boolean has(String id) throws IOException
099    {
100        return get(id) != null; 
101    }
102    
103    /**
104     * Gets the {@link PipelineDescriptor} for the given id
105     * @param id The id of the {@link PipelineDescriptor}
106     * @return the request {@link PipelineDescriptor}, or null if not found
107     * @throws IOException if an I/O exception occured when accessing the root folder of pipeline files
108     */
109    public PipelineDescriptor get(String id) throws IOException
110    {
111        if (__NO_OP_PIPELINE_ID.equals(id))
112        {
113            return _noOpPipelineDesc;
114        }
115        return _readAndGetPipeline(id);
116    }
117    
118    /**
119     * Gets all the {@link PipelineDescriptor}s labels by id
120     * @return the {@link PipelineDescriptor}s labels by id
121     * @throws IOException if an I/O exception occured when accessing the root folder of pipeline files
122     */
123    public Map<String, I18nizableText> getEnumeration() throws IOException
124    {
125        Map<String, I18nizableText> enumeration = new HashMap<>();
126        _readAll();
127        
128        for (String pipelineId : _pipelineDescCache.keySet())
129        {
130            PipelineDescriptor pipeline = _pipelineDescCache.get(pipelineId);
131            enumeration.put(pipelineId, pipeline.getLabel());
132        }
133        
134        return enumeration;
135    }
136    
137    /**
138     * Gets all the {@link PipelineDescriptor}s as JSON
139     * @return the {@link PipelineDescriptor}s as JSON
140     * @throws IOException if an I/O exception occured when accessing the root folder of pipeline files
141     */
142    public List<Map<String, Object>> getJsonEnumeration() throws IOException
143    {
144        return getEnumeration().entrySet()
145                .stream()
146                .map(entry -> ImmutableMap.of("value", entry.getKey(), "label", entry.getValue()))
147                .collect(Collectors.toList());
148    }
149    
150    /**
151     * Gets the id of the default {@link PipelineDescriptor}
152     * @return the id of the default {@link PipelineDescriptor}
153     */
154    public String getDefaultPipeline()
155    {
156        return __NO_OP_PIPELINE_ID;
157    }
158    
159    private synchronized PipelineDescriptor _readAndGetPipeline(String id) throws IOException
160    {
161        Path pipelinePath = _absolutePath(id);
162        if (Files.exists(pipelinePath))
163        {
164            Pair<String, PipelineDescriptor> readPipeline = _readPipeline(pipelinePath);
165            if (readPipeline != null)
166            {
167                PipelineDescriptor pipeline = readPipeline.getRight();
168                _pipelineDescCache.put(id, pipeline);
169                return pipeline;
170            }
171        }
172        return null;
173    }
174    
175    private synchronized void _readAll() throws IOException
176    {
177        Path pipelinesPath = _absolutePath("");
178        
179        Map<String, PipelineDescriptor> pipelines = new HashMap<>();
180        Files.list(pipelinesPath)
181                .map(this::_readPipeline)
182                .filter(Objects::nonNull)
183                .forEach(p -> pipelines.put(p.getLeft(), p.getRight()));
184        
185        pipelines.put(__NO_OP_PIPELINE_ID, _noOpPipelineDesc);
186        
187        _pipelineDescCache = pipelines;
188    }
189    
190    private Path _absolutePath(String relativePath) throws IOException
191    {
192        FileSource pipelinesDirScr = null;
193        FileSource src = null;
194        try
195        {
196            pipelinesDirScr = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR);
197            pipelinesDirScr.getFile().mkdirs();
198            
199            src = (FileSource) _resolver.resolveURI(ExtractionConstants.PIPELINES_DIR + relativePath);
200            File file = src.getFile();
201            return file.toPath();
202        }
203        finally
204        {
205            _resolver.release(pipelinesDirScr);
206            _resolver.release(src);
207        }
208    }
209    
210    private Pair<String, PipelineDescriptor> _readPipeline(Path pipelinePath)
211    {
212        String id = pipelinePath.getFileName().toString();
213        File pipelineFile = pipelinePath.toFile();
214        long lastModified = (pipelineFile.lastModified() / 1000) * 1000; // second precision for Linux file systems 
215        if (lastModified > _lastReading.getOrDefault(id, -1L))
216        {
217            // cache is out of date => need to read
218            try
219            {
220                Configuration conf = new DefaultConfigurationBuilder().buildFromFile(pipelineFile);
221                PipelineDescriptor pipeline = new ConfigurablePipelineDescriptor(id, _resolver, _serializerEP);
222                ContainerUtil.configure(pipeline, conf);
223                return Pair.of(id, pipeline);
224            }
225            catch (ConfigurationException | SAXException | IOException e)
226            {
227                getLogger().error("File '{}' could not be parsed as an extraction pipeline.", pipelinePath, e);
228                return null;
229            }
230            finally
231            {
232                _lastReading.put(id, _now());
233            }
234        }
235        else
236        {
237            // cache is up to date
238            PipelineDescriptor pipeline = _pipelineDescCache.get(id);
239            return pipeline == null ? null : Pair.of(id, pipeline);
240        }
241    }
242    
243    private static Long _now()
244    {
245        return Instant.now().truncatedTo(ChronoUnit.SECONDS).toEpochMilli();
246    }
247}