BigDataEngine.java
/**
* *****************************************************************************
* Copyright 2013 SEMOSS.ORG
*
* This file is part of SEMOSS.
*
* SEMOSS is free software: you can redistribute it and/or modify it under the
* terms of the GNU General Public License as published by the Free Software
* Foundation, either version 3 of the License, or (at your option) any later
* version.
*
* SEMOSS is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* SEMOSS. If not, see <http://www.gnu.org/licenses/>.
* ****************************************************************************
*/
package com.ostrichemulators.semtool.rdf.engine.impl;
import com.bigdata.journal.ITx;
import com.bigdata.journal.Journal;
import org.openrdf.repository.RepositoryException;
import com.ostrichemulators.semtool.util.Constants;
import com.bigdata.rdf.rules.InferenceEngine;
import com.bigdata.rdf.sail.BigdataSail;
import com.bigdata.rdf.sail.BigdataSailRepository;
import com.bigdata.rdf.sail.BigdataSailRepositoryConnection;
import com.bigdata.rdf.sail.CreateKBTask;
import com.bigdata.rdf.store.AbstractTripleStore;
import com.bigdata.rdf.task.AbstractApiTask;
import com.ostrichemulators.semtool.rdf.engine.api.InsightManager;
import java.io.File;
import java.util.List;
import java.util.Properties;
import org.apache.log4j.Logger;
import org.openrdf.model.Statement;
import com.ostrichemulators.semtool.rdf.engine.util.EngineManagementException;
import com.ostrichemulators.semtool.rdf.engine.util.StatementSorter;
import com.ostrichemulators.semtool.user.LocalUserImpl;
import com.ostrichemulators.semtool.user.Security;
import com.ostrichemulators.semtool.user.User;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import org.apache.commons.io.FileUtils;
import org.openrdf.http.protocol.UnauthorizedException;
import org.openrdf.repository.RepositoryConnection;
import org.openrdf.rio.turtle.TurtleWriter;
/**
* Big data engine serves to connect the .jnl files, which contain the RDF
* database, to the java engine.
*/
public class BigDataEngine extends AbstractSesameEngine {
private static final Logger log = Logger.getLogger( BigDataEngine.class );
private Journal journal = null;
private BigdataSailRepository repo = null;
private BigdataSailRepositoryConnection rc = null;
private BigdataSail sail = null;
private BigdataSailRepository insightrepo = null;
private InsightManagerImpl insightEngine = null;
private boolean externallySetInsightManager = false;
public BigDataEngine() {
}
public BigDataEngine( File jnl ) throws RepositoryException {
openDB( generateProperties( jnl ) );
}
@Override
protected void createRc( Properties props ) throws RepositoryException {
Properties rws = getRWSProperties( props );
boolean isremote = Boolean.parseBoolean( props.getProperty( REMOTE_KEY, "false" ) );
if ( isremote ) {
String url = props.getProperty( REPOSITORY_KEY );
String ins = props.getProperty( INSIGHTS_KEY );
log.debug( "big data remote! " + url + " ... " + ins );
throw new UnsupportedOperationException( "Remote Bigdata repositories are not yet supported" );
}
else {
// users have full access to local DBs
Security.getSecurity().associateUser( this, new LocalUserImpl() );
// the journal is the file itself
journal = new Journal( rws );
// the main KB
rws.setProperty( BigdataSail.Options.NAMESPACE, "kb" );
CreateKBTask ctor = new CreateKBTask( "kb", rws );
try {
AbstractApiTask.submitApiTask( journal, ctor ).get();
AbstractTripleStore triples
= AbstractTripleStore.class.cast( journal.getResourceLocator().
locate( "kb", ITx.UNISOLATED ) );
sail = new BigdataSail( triples );
repo = new BigdataSailRepository( sail );
repo.initialize();
// the insights KB
rws.setProperty( BigdataSail.Options.NAMESPACE, Constants.INSIGHTKB );
CreateKBTask ctor2 = new CreateKBTask( Constants.INSIGHTKB, rws );
AbstractApiTask.submitApiTask( journal, ctor2 ).get();
AbstractTripleStore insights
= AbstractTripleStore.class.cast( journal.getResourceLocator().
locate( Constants.INSIGHTKB, ITx.UNISOLATED ) );
BigdataSail insightSail = new BigdataSail( insights );
insightrepo = new BigdataSailRepository( insightSail );
insightrepo.initialize();
}
catch ( InterruptedException | ExecutionException e ) {
log.fatal( e, e );
}
rc = repo.getConnection();
}
}
@Override
protected RepositoryConnection getRawConnection() {
return rc;
}
@Override
public void setInsightManager( InsightManager ie ) {
super.setInsightManager( ie );
externallySetInsightManager = true;
}
@Override
protected InsightManager createInsightManager() throws RepositoryException {
// create an in-memory KB, but copy everything from our jnl-based
// KB to it
insightEngine = new InsightManagerImpl();
BigdataSailRepositoryConnection insightrc = null;
try {
insightrc = insightrepo.getReadOnlyConnection();
log.debug( "loading on-disk insights stmts: " + insightrc.size() );
//
// try ( Writer w = new BufferedWriter( new FileWriter( "/tmp/x.ttl" ) ) ) {
// insightrc.export( new TurtleWriter( w ) );
// }
// catch ( Exception e ) {
// // don't care
// }
insightEngine.loadFromRepository( insightrc );
}
finally {
if ( null != insightrc ) {
try {
insightrc.close();
}
catch ( RepositoryException re ) {
log.warn( re, re );
}
}
}
return insightEngine;
}
private void copyInsightsToDisk( InsightManager im ) throws RepositoryException {
// this function is a bit tricky...we want to:
// 1) commit then close this engine's write-handle on the main KB
// 2) open it on the Insights KB
// 3) rewrite everything to the Insights KB
// 4) close the Insights write handle
// 5) re-open the write handle to the main KB
List<Statement> stmts = new ArrayList<>();
User user = Security.getSecurity().getAssociatedUser( this );
stmts.addAll( InsightManagerImpl.getModel( im, user ) );
Collections.sort( stmts, new StatementSorter() );
try {
// 1
rc.commit();
rc.close();
}
catch ( Exception e ) {
log.error( "unable to prepare repository for insights management", e );
throw e;
}
try {
// 2
BigdataSailRepositoryConnection repoc = insightrepo.getConnection();
// 3
log.debug( "writing " + stmts.size() + " statements to on-disk insight kb" );
// sort the statements so a later export looks nice (totally unnecessary,
// but troubleshooting is easier)
repoc.begin();
repoc.clear();
repoc.add( stmts );
repoc.commit();
// 4
repoc.close();
logProvenance( stmts );
}
finally {
try {
// 5
rc = BigDataEngine.this.repo.getConnection();
}
catch ( Exception e ) {
log.error( e, e );
}
}
if ( log.isTraceEnabled() ) {
File dumpfile
= new File( FileUtils.getTempDirectory(), "semtool-outsights-committed.ttl" );
try ( Writer w = new BufferedWriter( new FileWriter( dumpfile ) ) ) {
TurtleWriter tw = new TurtleWriter( w );
for ( Statement s : stmts ) {
tw.handleStatement( s );
}
}
catch ( Exception ioe ) {
log.warn( ioe, ioe );
}
}
}
@Override
public void updateInsights( InsightManager im ) throws EngineManagementException {
try {
if ( !externallySetInsightManager ) {
copyInsightsToDisk( im );
}
insightEngine.addAll( im.getPerspectives(), true );
}
catch ( UnauthorizedException ue ) {
throw new EngineManagementException(
EngineManagementException.ErrorCode.ACCESS_DENIED, ue );
}
catch ( RepositoryException re ) {
throw new EngineManagementException(
EngineManagementException.ErrorCode.UNKNOWN, re );
}
}
/**
* Closes the data base associated with the engine. This will prevent further
* changes from being made in the data store and safely ends the active
* transactions and closes the engine.
*/
@Override
public void closeDB() {
super.closeDB();
try {
journal.close();
}
catch ( Exception e1 ) {
log.warn( "could not close journal file", e1 );
}
}
@Override
public void calculateInferences() throws RepositoryException {
try {
log.debug( "start calculating inferences" );
InferenceEngine ie = sail.getInferenceEngine();
ie.computeClosure( null );
updateLastModifiedDate();
rc.commit();
log.debug( "done calculating inferences" );
}
catch ( RepositoryException e ) {
log.error( e, e );
}
}
public static Properties generateProperties( File jnl ) {
Properties props = new Properties();
props.setProperty( Constants.SMSS_LOCATION, jnl.toString() );
props.setProperty( BigdataSail.Options.FILE, jnl.toString() );
props.setProperty( Constants.ENGINE_IMPL,
BigDataEngine.class.getCanonicalName() );
props.setProperty( Constants.SMSS_VERSION_KEY, "1.0" );
return props;
}
/**
* Gets the bigdata-specific properties
*
* @param prop all the properties to look through
*
* @return bigdata-specific properties
*/
private Properties getRWSProperties( Properties prop ) {
Properties rws = new Properties();
for ( String key : prop.stringPropertyNames() ) {
if ( key.startsWith( "com.bigdata" ) ) {
String val = prop.getProperty( key );
rws.setProperty( key, val );
}
}
return rws;
}
}