DiskBackedLoadingSheetData.java
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.ostrichemulators.semtool.poi.main;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.ostrichemulators.semtool.util.RDFDatatypeTools;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Logger;
import org.openrdf.model.Literal;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
import org.openrdf.model.ValueFactory;
import org.openrdf.model.impl.ValueFactoryImpl;
/**
* A class to encapsulate relationship loading sheet information.
*
* @author ryan
*/
public class DiskBackedLoadingSheetData extends LoadingSheetData {
private static final Logger log = Logger.getLogger( DiskBackedLoadingSheetData.class );
private static final int COMMITLIMIT = 100000;
private static final int READCACHELIMIT = 10000;
private File backingfile;
private long opsSinceLastCommit = 0;
private int datacount = 0;
private final Set<LoadingNodeAndPropertyValues> removedNodes = new HashSet<>();
private final ObjectMapper oxm = new ObjectMapper();
private int commitlimit = COMMITLIMIT;
private int readcachelimit = commitlimit / 10;
protected DiskBackedLoadingSheetData( String tabtitle, String type ) throws IOException {
this( tabtitle, type, new HashMap<>() );
}
protected DiskBackedLoadingSheetData( String tabtitle, String type,
Map<String, URI> props ) throws IOException {
this( tabtitle, type, null, null, props );
}
protected DiskBackedLoadingSheetData( String tabtitle, String sType, String oType,
String relname ) throws IOException {
this( tabtitle, sType, oType, relname, new HashMap<>() );
}
public DiskBackedLoadingSheetData( LoadingSheetData model ) throws IOException {
this( model.getName(), model.getSubjectType(), model.getObjectType(),
model.getRelname(), model.getPropertiesAndDataTypes() );
Iterator<LoadingNodeAndPropertyValues> it = model.iterator();
while ( it.hasNext() ) {
add( it.next() );
}
}
protected DiskBackedLoadingSheetData( String tabtitle, String sType, String oType,
String relname, Map<String, URI> props ) throws IOException {
super( tabtitle, sType, oType, relname, props );
backingfile = File.createTempFile( tabtitle + "-", ".lsdata" );
backingfile.delete(); // don't keep a file hanging around until we need to
log.debug( "backing file is: " + backingfile );
SimpleModule sm = new SimpleModule();
sm.addSerializer( LoadingNodeAndPropertyValues.class, new NapSerializer() );
sm.addDeserializer( LoadingNodeAndPropertyValues.class, new NapDeserializer() );
oxm.registerModule( sm );
}
public void setMaxElementsInMemory( int max ) {
if ( max > 0 ) {
commitlimit = max;
readcachelimit = max / 10;
}
}
@Override
public boolean isMemOnly() {
return false;
}
/**
* Releases any resources used by this class.
*/
@Override
public void release() {
super.clear();
removedNodes.clear();
FileUtils.deleteQuietly( backingfile );
}
@Override
public void finishLoading() {
commit();
}
/**
* Clears any stored loading data
*/
@Override
public void clear() {
super.clear();
datacount = 0;
commit();
}
@Override
protected void commit() {
// flush everything to our backing file
if ( opsSinceLastCommit > 0 ) {
try ( BufferedWriter writer
= new BufferedWriter( new FileWriter( backingfile, true ) ) ) {
DataIterator it = super.iterator();
while ( it.hasNext() ) {
writer.write( oxm.writeValueAsString( it.next() ) );
writer.newLine();
it.remove();
}
opsSinceLastCommit = 0;
}
catch ( IOException ioe ) {
log.warn( "loading sheet internal commit failed", ioe );
}
}
}
@Override
public DataIterator iterator() {
if ( backingfile.exists() ) {
try {
return new CacheIterator();
}
catch ( IOException ioe ) {
log.warn( "cannot access backing file in iterator", ioe );
}
}
return super.iterator();
}
@Override
public List<LoadingNodeAndPropertyValues> getData() {
List<LoadingNodeAndPropertyValues> list = new ArrayList<>();
Iterator<LoadingNodeAndPropertyValues> it = iterator();
while ( it.hasNext() ) {
list.add( it.next() );
}
return list;
}
@Override
public int rows() {
return datacount;
}
@Override
public boolean isEmpty() {
return ( 0 == datacount );
}
@Override
protected void added( LoadingNodeAndPropertyValues nap ) {
datacount++;
tryCommit();
}
private void tryCommit() {
opsSinceLastCommit++;
if ( opsSinceLastCommit >= commitlimit ) {
commit();
}
}
@Override
public void removeAll( Collection<LoadingNodeAndPropertyValues> naps ) {
super.removeAll( naps );
removedNodes.addAll( naps );
}
@Override
public void remove( LoadingNodeAndPropertyValues nap ) {
super.remove( nap );
removedNodes.add( nap );
}
private static class NapSerializer extends JsonSerializer<LoadingNodeAndPropertyValues> {
@Override
public void serialize( LoadingNodeAndPropertyValues value, JsonGenerator jgen,
SerializerProvider provider ) throws IOException, JsonProcessingException {
jgen.writeStartObject();
jgen.writeStringField( "subject", value.getSubject() );
jgen.writeStringField( "object", value.getObject() );
jgen.writeBooleanField( "subjectIsError", value.isSubjectError() );
jgen.writeBooleanField( "objectIsError", value.isObjectError() );
jgen.writeArrayFieldStart( "properties" );
for ( Map.Entry<String, Value> en : value.entrySet() ) {
Value val = en.getValue();
jgen.writeStartObject();
jgen.writeStringField( "prop", en.getKey() );
jgen.writeStringField( "value", val.stringValue() );
if ( val instanceof Literal ) {
Literal l = Literal.class.cast( val );
if ( null == l.getLanguage() ) {
URI dt = l.getDatatype();
if ( null != dt ) {
jgen.writeStringField( "dt", dt.stringValue() );
}
}
else {
jgen.writeStringField( "lang", l.getLanguage() );
}
}
jgen.writeEndObject();
}
jgen.writeEndArray();
jgen.writeEndObject();
}
}
private class NapDeserializer extends JsonDeserializer<LoadingNodeAndPropertyValues> {
@Override
public LoadingNodeAndPropertyValues deserialize( JsonParser jp, DeserializationContext ctxt ) throws IOException, JsonProcessingException {
ValueFactory vf = new ValueFactoryImpl();
JsonNode node = jp.getCodec().readTree( jp );
LoadingNodeAndPropertyValues nap
= new LoadingNodeAndPropertyValues( node.get( "subject" ).asText() );
JsonNode oj = node.get( "object" );
if ( !oj.isNull() ) {
nap.setObject( oj.asText() );
}
nap.setSubjectIsError( node.get( "subjectIsError" ).asBoolean( false ) );
nap.setObjectIsError( node.get( "objectIsError" ).asBoolean( false ) );
Iterator<JsonNode> propit = node.get( "properties" ).elements();
while ( propit.hasNext() ) {
JsonNode propval = propit.next();
String prop = propval.get( "prop" ).asText();
String valstr = propval.get( "value" ).asText();
JsonNode lang = propval.get( "lang" );
if ( null == lang || lang.isNull() ) {
// no language, so check for datatype
JsonNode dt = propval.get( "dt" );
if ( null == dt || dt.isNull() ) {
// just do the best we can
nap.put( prop, RDFDatatypeTools.getRDFStringValue( valstr, null, vf ) );
}
else {
nap.put( prop, vf.createLiteral( valstr, vf.createURI( dt.asText() ) ) );
}
}
else {
nap.put( prop, vf.createLiteral( valstr, lang.asText() ) );
}
}
return nap;
}
}
/**
* An iterator that iterates over the backing store first, then over the
* in-memory naps
*/
private class CacheIterator extends DataIteratorImpl {
private final BufferedReader reader;
private final Deque<LoadingNodeAndPropertyValues> readcache = new ArrayDeque<>();
private Iterator<LoadingNodeAndPropertyValues> memoryiter = null;
private LoadingNodeAndPropertyValues current;
public CacheIterator() throws IOException {
reader = new BufferedReader( new FileReader( backingfile ) );
cacheMoreFromStore();
}
@Override
public boolean hasNext() {
// we need to iterate over data from three places (in this order)
// 1) our readcache, until it is empty
// 2) our backing file, until it is empty
// 3) our in-memory data
// we'll populate our readcache by periodically filling it with data from
// our backing file
boolean hasnext = !readcache.isEmpty();
// our read cache is empty
if ( !hasnext ) {
// we haven't exhausted our backing file yet, so check there
if ( null == memoryiter ) {
hasnext = cacheMoreFromStore();
if ( !hasnext ) {
// close our file reader, since the file is empty
try {
reader.close();
}
catch ( IOException ioe ) {
log.warn( "problem closing file cache reader", ioe );
}
// our backing file is exhausted, so switch to our in-memory data
memoryiter = DiskBackedLoadingSheetData.super.iterator();
hasnext = memoryiter.hasNext();
}
}
else {
// we've already exhausted our backing file, so we're just reading
// from memory until we're out of data
hasnext = memoryiter.hasNext();
}
}
return hasnext;
}
private boolean cacheMoreFromStore() {
int rowsread = 0;
try {
String json = null;
while ( rowsread < READCACHELIMIT && null != ( json = reader.readLine() ) ) {
LoadingNodeAndPropertyValues nap
= oxm.readValue( json, LoadingNodeAndPropertyValues.class );
if ( !removedNodes.contains( nap ) ) {
rowsread++;
readcache.add( nap );
}
}
}
catch ( IOException ioe ) {
log.warn( "problem reading from cache", ioe );
}
return !readcache.isEmpty();
}
@Override
public LoadingNodeAndPropertyValues next() {
current = ( null == memoryiter ? readcache.poll() : memoryiter.next() );
return current;
}
@Override
public void remove() {
// if we're working off the backing store, just make a note of the
// removed node. else actually remove it from the in-memory list
if ( null == memoryiter ) {
removedNodes.add( current );
}
else {
memoryiter.remove();
}
}
@Override
public void release() {
readcache.clear();
try {
reader.close();
}
catch ( IOException ioe ) {
// don't really care
}
}
}
}