Use file system pusher.

This commit is contained in:
Magnus Ulf 2015-05-21 18:43:43 +02:00 committed by Olof Larsson
parent 8c5f6055a4
commit b2bc9ed9ab
23 changed files with 1016 additions and 265 deletions

View File

@ -50,7 +50,8 @@ import com.massivecraft.massivecore.mson.Mson;
import com.massivecraft.massivecore.mson.MsonEvent;
import com.massivecraft.massivecore.ps.PS;
import com.massivecraft.massivecore.ps.PSAdapter;
import com.massivecraft.massivecore.store.ExamineThread;
import com.massivecraft.massivecore.store.ModificationPollerLocal;
import com.massivecraft.massivecore.store.ModificationPollerRemote;
import com.massivecraft.massivecore.teleport.EngineScheduledTeleport;
import com.massivecraft.massivecore.util.IdUtil;
import com.massivecraft.massivecore.util.MUtil;
@ -173,9 +174,6 @@ public class MassiveCore extends MassivePlugin
// TODO: Test and ensure reload compat.
// Coll.instances.clear();
// Start the examine thread
ExamineThread.get().start();
if ( ! preEnable()) return;
// Load Server Config
@ -202,6 +200,11 @@ public class MassiveCore extends MassivePlugin
AspectColl.get().init();
MassiveCoreMConfColl.get().init();
// Start the examine threads
// Start AFTER initializing the MConf, because they rely on the MConf.
ModificationPollerLocal.get().start();
ModificationPollerRemote.get().start();
// Register commands
this.outerCmdMassiveCore = new CmdMassiveCore() { public List<String> getAliases() { return MassiveCoreMConf.get().aliasesOuterMassiveCore; } };
this.outerCmdMassiveCore.register(this);
@ -234,7 +237,9 @@ public class MassiveCore extends MassivePlugin
public void onDisable()
{
super.onDisable();
ExamineThread.get().interrupt();
ModificationPollerLocal.get().interrupt();
ModificationPollerRemote.get().interrupt();
MassiveCoreTaskDeleteFiles.get().run();
IdUtil.saveCachefileDatas();
}

View File

@ -95,4 +95,13 @@ public class MassiveCoreMConf extends Entity<MassiveCoreMConf>
public String variableBuffer = "***buffer***";
public boolean usingVariableBuffer = true;
// -------------------------------------------- //
// MSTORE CONFIGURATON
// -------------------------------------------- //
public volatile long millisBetweenLocalPoll = 1000;
public volatile long millisBetweenLocalPollColl = 100;
public volatile long millisBetweenRemotePoll = 1000;
public volatile long millisBetweenRemotePollColl = 100;
}

View File

@ -8,7 +8,6 @@ import com.massivecraft.massivecore.command.MassiveCommand;
import com.massivecraft.massivecore.command.requirement.RequirementHasPerm;
import com.massivecraft.massivecore.command.type.store.TypeColl;
import com.massivecraft.massivecore.store.Coll;
import com.massivecraft.massivecore.store.ExamineThread;
import com.massivecraft.massivecore.util.MUtil;
import com.massivecraft.massivecore.util.Txt;
@ -51,7 +50,7 @@ public class CmdMassiveCoreStoreStats extends MassiveCommand
public void performTotal()
{
msg(Txt.titleize("MStore Total Statistics"));
msg("<k>Last Examine Duration: <v>%d<i>ms", ExamineThread.get().getLastDurationMillis());
//msg("<k>Last Examine Duration: <v>%d<i>ms", ExamineThread.get().getLastDurationMillis());
msg("<a>== <k>Coll <a>| <k>Sync Count In <a>| <k>Sync Count Out <a>==");
for (Entry<String, Coll<?>> entry : Coll.getMap().entrySet())
{

View File

@ -4,7 +4,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -94,6 +93,33 @@ public class Coll<E> extends CollAbstract<E>
protected Object collDriverObject;
@Override public Object getCollDriverObject() { return this.collDriverObject; }
@Override
public boolean supportsPusher()
{
return this.getDb().supportsPusher();
}
protected PusherColl pusher;
@Override
public PusherColl getPusher()
{
if (this.pusher == null) this.pusher = this.getDb().getPusher(this);
return this.pusher;
}
public String getDebugName()
{
String ret = this.getPlugin().getName() + "::" + this.getBasename();
if (this.getUniverse() != null)
{
ret += "@" + this.getUniverse();
}
return ret;
}
// -------------------------------------------- //
// STORAGE
// -------------------------------------------- //
@ -163,14 +189,25 @@ public class Coll<E> extends CollAbstract<E>
@Override public boolean isLowercasing() { return this.lowercasing; }
@Override public void setLowercasing(boolean lowercasing) { this.lowercasing = lowercasing; }
protected int localPollFrequency = 5;
@Override public int getLocalPollFrequency() { return this.localPollFrequency; }
@Override public void setLocalPollFrequency(int frequency) { this.localPollFrequency = frequency; }
// We often try to call Entity#changed to inform that an entity has been changed locally.
// And on some Colls we expect it to always be done.
// However we cannot be sure, but if we expect to always do it
// then we tell the collection to notify us if we failed to call Entity#changed.
protected boolean warnOnLocalAlter = false;
@Override public boolean isWarningOnLocalAlter() { return this.warnOnLocalAlter; }
@Override public void setWarnOnLocalAlter(boolean warnOnLocalAlter) { this.warnOnLocalAlter = warnOnLocalAlter; }
// Should that instance be saved or not?
// If it is default it should not be saved.
@SuppressWarnings("rawtypes")
@Override public boolean isDefault(E entity)
{
if (entity instanceof Entity)
{
return ((Entity)entity).isDefault();
return ((Entity<?>)entity).isDefault();
}
else
{
@ -215,8 +252,6 @@ public class Coll<E> extends CollAbstract<E>
// This simply creates and returns a new instance
// It does not detach/attach or anything. Just creates a new instance.
// TODO: Would it ever make sense for this to fail?
// Should we just throw an exception immediately if it fails?
@Override
public E createNewInstance()
{
@ -291,35 +326,37 @@ public class Coll<E> extends CollAbstract<E>
this.id2entity.put(id, entity);
this.entity2id.put(entity, id);
EntityMeta meta = new EntityMeta();
// Identify Modification
if (noteModification)
{
this.identifiedModifications.put(id, Modification.LOCAL_ATTACH);
}
this.metaData.put(id, meta);
// POST
this.postAttach(entity, id);
return id;
}
@SuppressWarnings("unchecked")
@Override
public E detachEntity(Object entity)
public E detachEntity(E entity)
{
if (entity == null) throw new NullPointerException("entity");
E e = (E)entity;
String id = this.getId(e);
String id = this.getId(entity);
if (id == null)
{
// It seems the entity is already detached.
// In such case just silently return it.
return e;
return entity;
}
this.detachFixed(e, id);
return e;
this.detachFixed(entity, id);
return entity;
}
@Override
@ -394,6 +431,15 @@ public class Coll<E> extends CollAbstract<E>
protected Map<String, Modification> identifiedModifications;
protected synchronized void putIdentifiedModificationFixed(String id, Modification modification)
{
if (id == null) throw new NullPointerException("id");
if (modification == null) throw new NullPointerException("modification");
Modification old = this.identifiedModifications.get(id);
if (old != null && modification.getPriority() <= old.getPriority()) return;
this.identifiedModifications.put(id, modification);
}
protected synchronized void removeIdentifiedModificationFixed(String id)
{
if (id == null) throw new NullPointerException("id");
@ -405,19 +451,27 @@ public class Coll<E> extends CollAbstract<E>
// -------------------------------------------- //
// The strings are the ids.
protected Map<String, Long> lastMtime;
protected Map<String, JsonElement> lastRaw;
protected Set<String> lastDefault;
protected Map<String, EntityMeta> metaData;
protected synchronized void clearSynclogFixed(String id)
protected EntityMeta getMetaFixed(String id)
{
if (id == null) throw new NullPointerException("id");
this.lastMtime.remove(id);
this.lastRaw.remove(id);
this.lastDefault.remove(id);
EntityMeta meta = this.metaData.get(id);
if (meta == null)
{
meta = new EntityMeta();
this.metaData.put(id, meta);
}
return meta;
}
protected EntityMeta setMetaFixed(String id, EntityMeta meta)
{
if (id == null) throw new NullPointerException("id");
if (meta == null) return this.metaData.remove(id);
else return this.metaData.put(id, meta);
}
// Log database synchronization for display in the "/massivecore mstore stats" command.
private Map<String, Long> id2out = new TreeMap<String, Long>();
private Map<String, Long> id2in = new TreeMap<String, Long>();
@ -455,7 +509,7 @@ public class Coll<E> extends CollAbstract<E>
if (id == null) throw new NullPointerException("id");
this.removeIdentifiedModificationFixed(id);
this.clearSynclogFixed(id);
this.setMetaFixed(id, null);
E entity = this.id2entity.remove(id);
if (entity == null) return null;
@ -478,7 +532,7 @@ public class Coll<E> extends CollAbstract<E>
if (id == null) throw new NullPointerException("id");
this.removeIdentifiedModificationFixed(id);
this.clearSynclogFixed(id);
this.setMetaFixed(id, null);
this.getDb().delete(this, id);
}
@ -489,28 +543,31 @@ public class Coll<E> extends CollAbstract<E>
if (id == null) throw new NullPointerException("id");
this.removeIdentifiedModificationFixed(id);
this.clearSynclogFixed(id);
this.setMetaFixed(id, null);
E entity = this.id2entity.get(id);
if (entity == null) return;
EntityMeta meta = this.getMetaFixed(id);
JsonElement raw = this.getGson().toJsonTree(entity, this.getEntityClass());
this.lastRaw.put(id, raw);
meta.setLastRaw(raw);
if (this.isDefault(entity))
{
this.getDb().delete(this, id);
this.lastDefault.add(id);
meta.setDefault(true);
}
else
{
long mtime = this.getDb().save(this, id, raw);
if (mtime == 0) return; // This fail should not happen often. We could handle it better though.
this.lastMtime.put(id, mtime);
// TODO: This fail should not happen often. We could handle it better though.
// Perhaps we should log it, or simply try again.
if (mtime == 0) return;
meta.setMtime(mtime);
}
}
@SuppressWarnings("unchecked")
@Override
public synchronized void loadFromRemoteFixed(String id, Entry<JsonElement, Long> remoteEntry)
{
@ -531,42 +588,13 @@ public class Coll<E> extends CollAbstract<E>
}
}
Long mtime = remoteEntry.getValue();
if (mtime == null)
{
logLoadError(id, "Last modification time (mtime) was null. The file might not be readable or simply not exist.");
return;
}
if (mtime == 0)
{
logLoadError(id, "Last modification time (mtime) was 0. The file might not be readable or simply not exist.");
return;
}
JsonElement raw = remoteEntry.getKey();
if (raw == null)
{
logLoadError(id, "Raw data was null. Is the file completely empty?");
return;
}
if (raw.isJsonNull())
{
logLoadError(id, "Raw data was JSON null. It seems you have a file containing just the word \"null\". Why would you do this?");
return;
}
Long mtime = remoteEntry.getValue();
if ( ! this.remoteEntryIsOk(id, remoteEntry)) return;
// Calculate temp but handle raw cases.
E temp = null;
if (this.getEntityClass().isAssignableFrom(JsonObject.class))
{
temp = (E) raw;
}
else
{
temp = this.getGson().fromJson(raw, this.getEntityClass());
}
E entity = this.get(id, false);
E temp = this.getGson().fromJson(raw, this.getEntityClass());
E entity = this.getFixed(id, false);
if (entity != null)
{
// It did already exist
@ -584,9 +612,41 @@ public class Coll<E> extends CollAbstract<E>
this.attach(entity, id, false);
}
this.lastRaw.put(id, raw);
this.lastMtime.put(id, mtime);
this.lastDefault.remove(id);
EntityMeta meta = this.getMetaFixed(id);
meta.setLastRaw(raw);
meta.setMtime(mtime);
meta.setDefault(false);
}
public boolean remoteEntryIsOk(String id, Entry<JsonElement, Long> remoteEntry)
{
Long mtime = remoteEntry.getValue();
if (mtime == null)
{
this.logLoadError(id, "Last modification time (mtime) was null. The file might not be readable or simply not exist.");
return false;
}
if (mtime == 0)
{
this.logLoadError(id, "Last modification time (mtime) was 0. The file might not be readable or simply not exist.");
return false;
}
JsonElement raw = remoteEntry.getKey();
if (raw == null)
{
this.logLoadError(id, "Raw data was null. Is the file completely empty?");
return false;
}
if (raw.isJsonNull())
{
this.logLoadError(id, "Raw data was JSON null. It seems you have a file containing just the word \"null\". Why would you do this?");
return false;
}
return true;
}
public void logLoadError(String entityId, String error)
@ -605,16 +665,16 @@ public class Coll<E> extends CollAbstract<E>
public Modification examineIdFixed(String id, Long remoteMtime)
{
if (id == null) throw new NullPointerException("id");
// Local Attach and Detach has the top priority.
// Otherwise newly attached entities would be removed thinking it was a remote detach.
// Otherwise newly detached entities would be loaded thinking it was a remote attach.
Modification ret = this.identifiedModifications.get(id);
// Meta might be non-existing. But then we create it here.
// If it is examined then it will be attached anyways.
EntityMeta meta = this.getMetaFixed(id);
Modification current = this.identifiedModifications.get(id);
// DEBUG
// if (Bukkit.isPrimaryThread())
// {
// MassiveCore.get().log(Txt.parse("<a>examineId <k>Coll: <v>%s <k>Entity: <v>%s <k>Modification: <v>%s", this.getName(), id, ret));
// }
if (ret == Modification.LOCAL_ATTACH || ret == Modification.LOCAL_DETACH) return ret;
if (current != null && current.hasTopPriority()) return current;
E localEntity = this.id2entity.get(id);
if (remoteMtime == null)
@ -626,42 +686,118 @@ public class Coll<E> extends CollAbstract<E>
boolean existsLocal = (localEntity != null);
boolean existsRemote = (remoteMtime != 0);
// So we don't have this anywhere?
if ( ! existsLocal && ! existsRemote) return Modification.UNKNOWN;
// If we have it both locally and remotely.
if (existsLocal && existsRemote)
{
Long lastMtime = this.lastMtime.get(id);
if (remoteMtime.equals(lastMtime) == false) return Modification.REMOTE_ALTER;
long lastMtime = meta.getMtime();
// If mtime is not equal remote takes priority, and we assume it is altered.
if ( ! remoteMtime.equals(lastMtime)) return Modification.REMOTE_ALTER;
// Else we check for a local alter.
if (this.examineHasLocalAlterFixed(id, localEntity)) return Modification.LOCAL_ALTER;
}
// If we just have it locally...
else if (existsLocal)
{
if (this.lastDefault.contains(id))
// ...and it was default and thus not saved to the db...
if (meta.isDefault())
{
// ...and also actually altered.
// Then it is a local alter.
if (this.examineHasLocalAlterFixed(id, localEntity)) return Modification.LOCAL_ALTER;
}
// ...otherwise it was detached remotely.
else
{
return Modification.REMOTE_DETACH;
}
}
// If we just have it remotely. It was attached there.
else if (existsRemote)
{
return Modification.REMOTE_ATTACH;
}
// No modification was made.
return Modification.NONE;
}
@Override
public Modification examineIdLocalFixed(final String id)
{
if (id == null) throw new NullPointerException("id");
// A local entity should have a meta.
Modification current = this.identifiedModifications.get(id);
if (current != null && current.hasTopPriority()) return current;
E localEntity = this.id2entity.get(id);
// If not existing, then wtf.
if (localEntity == null) return Modification.UNKNOWN;
// Altered locally.
if (this.examineHasLocalAlterFixed(id, localEntity)) return Modification.LOCAL_ALTER;
// Not altered locally.
return Modification.NONE;
}
@Override
public Modification examineIdRemoteFixed(final String id, Long remoteMtime)
{
if (id == null) throw new NullPointerException("id");
// We will always know beforehand, when local attach and detach is done.
// Because they are performed by calling a method on this coll.
// Meta might be non-existing. But then we create it here.
// If it is examined then it will be attached anyways.
EntityMeta meta = this.getMetaFixed(id);
Modification current = this.identifiedModifications.get(id);
if (current != null && current.hasTopPriority()) return current;
if (remoteMtime == null)
{
// TODO: This is slow
remoteMtime = this.getDb().getMtime(this, id);
}
E localEntity = this.id2entity.get(id);
boolean existsLocal = (localEntity != null);
boolean existsRemote = (remoteMtime != 0);
// So we don't have this anywhere?
if ( ! existsLocal && ! existsRemote) return Modification.UNKNOWN;
Long lastMtime = meta.getMtime();
// If time is different
// then it is remotely altered
if (existsLocal && existsRemote && ! lastMtime.equals(remoteMtime)) return Modification.REMOTE_ALTER;
// If it doesn't exist remotely, and it wasn't because it was default. It was detached remotely.
if (!existsRemote && ! meta.isDefault()) return Modification.REMOTE_DETACH;
// If it doesn't exist locally, it was attached remotely.
if (!existsLocal) return Modification.REMOTE_ATTACH;
// No modification spotted.
return Modification.NONE;
}
protected boolean examineHasLocalAlterFixed(String id, E entity)
{
JsonElement lastRaw = this.lastRaw.get(id);
JsonElement lastRaw = this.getMetaFixed(id).getLastRaw();
JsonElement currentRaw = null;
try
{
currentRaw = this.getGson().toJsonTree(entity, this.getEntityClass());
currentRaw = this.getGson().toJsonTree(entity);
}
catch (Exception e)
{
@ -686,6 +822,8 @@ public class Coll<E> extends CollAbstract<E>
modification = this.examineIdFixed(id, remoteMtime);
}
if (MStore.DEBUG_ENABLED) System.out.println(this.getDebugName() + " syncronising " + modification + " on " + id);
// DEBUG
// MassiveCore.get().log(Txt.parse("<a>syncId <k>Coll: <v>%s <k>Entity: <v>%s <k>Modification: <v>%s", this.getName(), id, modification));
@ -735,41 +873,118 @@ public class Coll<E> extends CollAbstract<E>
}
@Override
public void identifyModifications()
public void identifyModifications(boolean sure)
{
if (MStore.DEBUG_ENABLED) System.out.println(this.getDebugName() + " polling for all changes");
// Get remote id2mtime snapshot
Map<String, Long> id2RemoteMtime = this.getDb().getId2mtime(this);
// Compile a list of all ids (both remote and local)
Set<String> allids = new HashSet<String>();
allids.addAll(id2RemoteMtime.keySet());
allids.addAll(this.id2entity.keySet());
// Java 8
//this.id2entity.keySet().forEach(id -> id2RemoteMtime.putIfAbsent(id, 0));
// Java 8 >
for (String id : this.id2entity.keySet())
{
if (id2RemoteMtime.containsKey(id)) continue;
id2RemoteMtime.put(id, 0L);
}
// Check for modifications
for (String id : allids)
for (Entry<String, Long> entry : id2RemoteMtime.entrySet())
{
Long remoteMtime = id2RemoteMtime.get(id);
if (remoteMtime == null) remoteMtime = 0L;
Modification modification = this.examineIdFixed(id, remoteMtime);
if (modification.isModified())
{
this.identifiedModifications.put(id, modification);
}
this.identifyModificationFixed(entry.getKey(), entry.getValue(), sure);
}
}
@Override
public void syncIdentified(boolean safe)
public void identifyModificationFixed(String id, Long remoteMtime, boolean sure)
{
if (id == null) throw new NullPointerException("id");
Modification modification = this.examineIdFixed(id, remoteMtime);
this.storeModificationIdentification(id, modification, sure);
}
@Override
public void identifyLocalModifications(boolean sure)
{
if (MStore.DEBUG_ENABLED) System.out.println(this.getDebugName() + " polling for local changes");
for (String id : id2entity.keySet())
{
this.identifyLocalModificationFixed(id, sure);
}
}
@Override
public void identifyLocalModificationFixed(String id, boolean sure)
{
if (id == null) throw new NullPointerException("id");
Modification modification = this.examineIdLocalFixed(id);
this.storeModificationIdentification(id, modification, sure);
}
@Override
public void identifyRemoteModifications(boolean sure)
{
if (MStore.DEBUG_ENABLED) System.out.println(this.getDebugName() + " polling for remote changes");
// Get remote id2mtime snapshot
Map<String, Long> id2RemoteMtime = this.getDb().getId2mtime(this);
//Note: We must also check local ids, in case of remote detach.
// Java 8
//this.id2entity.keySet().forEach(id -> id2RemoteMtime.putIfAbsent(id, 0));
// Java 8 >
for (String id : this.id2entity.keySet())
{
if (id2RemoteMtime.containsKey(id)) continue;
id2RemoteMtime.put(id, 0L);
}
// Check for modifications
for (Entry<String, Long> entry : id2RemoteMtime.entrySet())
{
this.identifyRemoteModificationFixed(entry.getKey(), entry.getValue(), sure);
}
}
@Override
public void identifyRemoteModificationFixed(String id, Long remoteMtime, boolean sure)
{
if (id == null) throw new NullPointerException("id");
Modification modification = this.examineIdRemoteFixed(id, remoteMtime);
this.storeModificationIdentification(id, modification, sure);
}
protected void storeModificationIdentification(String id, Modification modification, boolean sure)
{
if (this.isWarningOnLocalAlter() && modification == Modification.LOCAL_ALTER)
{
MassiveCore.get().log(
"A local alter was made in " + this.getName() + " on " + id,
"This was unintended, the developers should be informed."
);
}
if (modification.isModified())
{
if (MStore.DEBUG_ENABLED) System.out.println(this.getDebugName() + " identified " + modification + " on " + id);
if (!sure && ! modification.isSafe()) modification = Modification.UNKNOWN;
this.putIdentifiedModificationFixed(id, modification);
}
}
@Override
public void syncIdentified()
{
for (Entry<String, Modification> entry : this.identifiedModifications.entrySet())
{
String id = entry.getKey();
Modification modification = entry.getValue();
if (safe)
{
modification = null;
}
this.syncIdFixed(id, modification);
}
}
@ -777,8 +992,8 @@ public class Coll<E> extends CollAbstract<E>
@Override
public void syncAll()
{
this.identifyModifications();
this.syncIdentified(false);
this.identifyModifications(true);
this.syncIdentified();
}
@Override
@ -804,7 +1019,7 @@ public class Coll<E> extends CollAbstract<E>
@Override
public void onTick()
{
this.syncIdentified(true);
this.syncIdentified();
}
// -------------------------------------------- //
@ -841,14 +1056,10 @@ public class Coll<E> extends CollAbstract<E>
this.id2entity = (sorted) ? new ConcurrentSkipListMap<String, E>(NaturalOrderComparator.get()) : new ConcurrentHashMap<String, E>();
this.entity2id = (Entity.class.isAssignableFrom(entityClass) && sorted) ? new ConcurrentSkipListMap<E, String>((Comparator<? super E>) ComparatorEntityId.get()) : new ConcurrentHashMap<E, String>();
// IDENTIFIED MODIFICATIONS
// ENTITY DATA
this.metaData = new ConcurrentHashMap<String, EntityMeta>();
this.identifiedModifications = new ConcurrentHashMap<String, Modification>();
// SYNCLOG
this.lastMtime = new ConcurrentHashMap<String, Long>();
this.lastRaw = new ConcurrentHashMap<String, JsonElement>();
this.lastDefault = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
this.tickTask = new Runnable()
{
@Override public void run() { Coll.this.onTick(); }
@ -865,6 +1076,11 @@ public class Coll<E> extends CollAbstract<E>
{
if (this.inited()) return; // TODO: Would throwing an exception make more sense?
if (this.supportsPusher())
{
//this.getPusher().init();
}
this.initLoadAllFromRemote();
// this.syncAll();
@ -876,6 +1092,11 @@ public class Coll<E> extends CollAbstract<E>
{
if ( ! this.inited()) return; // TODO: Would throwing an exception make more sense?
if (this.supportsPusher())
{
//this.getPusher().deinit();
}
// TODO: Save outwards only? We may want to avoid loads at this stage...
this.syncAll();
@ -887,5 +1108,5 @@ public class Coll<E> extends CollAbstract<E>
{
return name2instance.containsKey(this.getName());
}
}

View File

@ -191,10 +191,40 @@ public abstract class CollAbstract<E> implements CollInterface<E>
@Override
public Modification examineIdFixed(String id)
{
// Null check done later.
if (id == null) throw new NullPointerException("id");
return this.examineIdFixed(id, null);
}
// Examine local
@Override
public Modification examineIdLocal(Object oid)
{
if (oid == null) throw new NullPointerException("oid");
return this.examineIdLocalFixed(this.fixIdOrThrow(oid));
}
// Examine remote
@Override
public Modification examineIdRemote(Object oid)
{
if (oid == null) throw new NullPointerException("oid");
return this.examineIdRemoteFixed(this.fixIdOrThrow(oid));
}
@Override
public Modification examineIdRemote(Object oid, Long remoteMtime)
{
if (oid == null) throw new NullPointerException("oid");
return this.examineIdRemoteFixed(this.fixIdOrThrow(oid), remoteMtime);
}
@Override
public Modification examineIdRemoteFixed(String id)
{
if (id == null) throw new NullPointerException("id");
return this.examineIdRemoteFixed(id, null);
}
// Sync
@Override
public Modification syncId(Object oid)
@ -204,30 +234,31 @@ public abstract class CollAbstract<E> implements CollInterface<E>
}
@Override
public Modification syncId(Object oid, Modification modificationState)
public Modification syncId(Object oid, Modification modification)
{
if (oid == null) throw new NullPointerException("oid");
return this.syncIdFixed(this.fixIdOrThrow(oid), modificationState);
return this.syncIdFixed(this.fixIdOrThrow(oid), modification);
}
@Override
public Modification syncId(Object oid, Modification modificationState, Entry<JsonElement, Long> remoteEntry)
public Modification syncId(Object oid, Modification modification, Entry<JsonElement, Long> remoteEntry)
{
if (oid == null) throw new NullPointerException("oid");
return this.syncIdFixed(this.fixIdOrThrow(oid), modificationState, remoteEntry);
return this.syncIdFixed(this.fixIdOrThrow(oid), modification, remoteEntry);
}
// Sync fixed
@Override
public Modification syncIdFixed(String id)
{
// Null check done later.
if (id == null) throw new NullPointerException("id");
return this.syncIdFixed(id, null);
}
@Override
public Modification syncIdFixed(String id, Modification modification)
{
// Null check done later.
if (id == null) throw new NullPointerException("id");
return this.syncIdFixed(id, modification, null);
}

View File

@ -32,6 +32,9 @@ public interface CollInterface<E> extends Named
public Db getDb();
public Object getCollDriverObject();
public boolean supportsPusher();
public PusherColl getPusher();
// -------------------------------------------- //
// STORAGE
// -------------------------------------------- //
@ -90,6 +93,12 @@ public interface CollInterface<E> extends Named
public boolean isLowercasing();
public void setLowercasing(boolean lowercasing);
public int getLocalPollFrequency();
public void setLocalPollFrequency(int frequency);
public boolean isWarningOnLocalAlter();
public void setWarnOnLocalAlter(boolean warnOnLocalAlter);
// A default entity will not be saved.
// This is often used together with creative collections to save disc space.
public boolean isDefault(E entity);
@ -116,7 +125,7 @@ public interface CollInterface<E> extends Named
public String attach(E entity);
public String attach(E entity, Object oid);
public E detachEntity(Object entity);
public E detachEntity(E entity);
public E detachId(Object oid);
public E detachIdFixed(String id);
@ -127,7 +136,7 @@ public interface CollInterface<E> extends Named
public void postDetach(E entity, String id);
// -------------------------------------------- //
// IDENTIFIED CHANGES
// IDENTIFIED MODIFICATIONS
// -------------------------------------------- //
/*
@ -175,24 +184,41 @@ public interface CollInterface<E> extends Named
// oid
public Modification examineId(Object oid);
public Modification examineId(Object oid, Long remoteMtime);
public Modification examineIdLocal(Object oid);
public Modification examineIdRemote(Object oid);
public Modification examineIdRemote(Object oid, Long remoteMtime);
// Fixed id
public Modification examineIdFixed(String id);
public Modification examineIdFixed(String id, Long remoteMtime);
public Modification examineIdLocalFixed(String id);
public Modification examineIdRemoteFixed(String id);
public Modification examineIdRemoteFixed(String id, Long remoteMtime);
// oid
// Sync
public Modification syncId(Object oid);
public Modification syncId(Object oid, Modification modificationState);
public Modification syncId(Object oid, Modification modificationState, Entry<JsonElement, Long> remoteEntry);
public Modification syncId(Object oid, Modification modification);
public Modification syncId(Object oid, Modification modification, Entry<JsonElement, Long> remoteEntry);
// fixed id
// Sync fixed
public Modification syncIdFixed(String id);
public Modification syncIdFixed(String id, Modification modificationState);
public Modification syncIdFixed(String id, Modification modificationState, Entry<JsonElement, Long> remoteEntry);
public Modification syncIdFixed(String id, Modification modification);
public Modification syncIdFixed(String id, Modification modification, Entry<JsonElement, Long> remoteEntry);
public void syncIdentified(boolean safe);
public void syncIdentified();
public void syncAll();
public void identifyModifications();
// Identity
public void identifyModifications(boolean sure);
public void identifyModificationFixed(String id, Long remoteMtime, boolean sure);
public void identifyLocalModifications(boolean sure);
public void identifyLocalModificationFixed(String id, boolean sure);
public void identifyRemoteModifications(boolean sure);
public void identifyRemoteModificationFixed(String id, Long remoteMtime, boolean sure);
// Init
public void initLoadAllFromRemote();
// -------------------------------------------- //

View File

@ -28,7 +28,7 @@ public interface Db
// -------------------------------------------- //
public String getDriverName();
public Db getDb(String uri);
public Db getDb(String uri); // TODO: This seems a bit odd.
public boolean dropDb();
public Set<String> getCollnames();
public boolean renameColl(String from, String to);
@ -40,5 +40,7 @@ public interface Db
public Map<String, Entry<JsonElement, Long>> loadAll(Coll<?> coll);
public long save(Coll<?> coll, String id, JsonElement data);
public void delete(Coll<?> coll, String id);
public boolean supportsPusher();
public PusherColl getPusher(Coll<?> coll);
}
}

View File

@ -77,4 +77,13 @@ public abstract class DbAbstract implements Db
{
this.getDriver().delete(coll, id);
}
public boolean supportsPusher()
{
return this.getDriver().supportsPusher();
}
public PusherColl getPusher(Coll<?> coll)
{
return this.getDriver().getPusher(coll);
}
}

View File

@ -57,4 +57,8 @@ public interface Driver
// Delete X
public void delete(Coll<?> coll, String id);
// Database pusher
public boolean supportsPusher();
public PusherColl getPusher(Coll<?> coll);
}

View File

@ -1,6 +1,7 @@
package com.massivecraft.massivecore.store;
import java.io.File;
import java.io.IOException;
import java.util.AbstractMap.SimpleEntry;
import java.util.ArrayList;
import java.util.Collection;
@ -9,8 +10,8 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.Set;
import com.massivecraft.massivecore.util.DiscUtil;
import com.massivecraft.massivecore.xlib.gson.JsonElement;
@ -61,7 +62,7 @@ public class DriverFlatfile extends DriverAbstract
return false;
}
}
@Override
public Set<String> getCollnames(Db db)
{
@ -84,7 +85,7 @@ public class DriverFlatfile extends DriverAbstract
File fileTo = new File(dir, to);
return fileFrom.renameTo(fileTo);
}
@Override
public boolean containsId(Coll<?> coll, String id)
{
@ -123,7 +124,7 @@ public class DriverFlatfile extends DriverAbstract
// Get Directory
File directory = getDirectory(coll);
if ( ! directory.isDirectory()) return ret;
if ( ! directory.isDirectory()) return ret; // TODO: Throw exception instead?
// For each .json file
for (File file : directory.listFiles(JsonFileFilter.get()))
@ -199,7 +200,7 @@ public class DriverFlatfile extends DriverAbstract
// Return Ret
return ret;
}
@Override
public long save(Coll<?> coll, String id, JsonElement data)
{
@ -208,7 +209,7 @@ public class DriverFlatfile extends DriverAbstract
if (DiscUtil.writeCatch(file, content) == false) return 0;
return file.lastModified();
}
@Override
public void delete(Coll<?> coll, String id)
{
@ -216,6 +217,25 @@ public class DriverFlatfile extends DriverAbstract
file.delete();
}
@Override
public boolean supportsPusher()
{
return true;
}
@Override
public PusherColl getPusher(Coll<?> coll)
{
try
{
return new PusherCollFlatfile(coll);
}
catch (IOException e)
{
throw new RuntimeException("Failed to create a flatfile system pusher.", e);
}
}
// -------------------------------------------- //
// UTIL
// -------------------------------------------- //
@ -238,5 +258,5 @@ public class DriverFlatfile extends DriverAbstract
File idFile = new File(collDir, id + DOTJSON);
return idFile;
}
}

View File

@ -273,6 +273,19 @@ public class DriverMongo extends DriverAbstract
dbcoll.remove(new BasicDBObject(ID_FIELD, id), MassiveCoreMConf.get().getMongoDbWriteConcernDelete());
}
@Override
public boolean supportsPusher()
{
return false;
}
@Override
public PusherColl getPusher(Coll<?> coll)
{
throw new UnsupportedOperationException("MongoDB does not have a pusher change.");
}
//----------------------------------------------//
// UTIL
//----------------------------------------------//

View File

@ -50,7 +50,7 @@ public abstract class Entity<E extends Entity<E>>
Coll<E> coll = this.getColl();
if (coll == null) return (E)this;
return coll.detachEntity(this);
return coll.detachEntity((E) this);
}
public boolean attached()
@ -105,35 +105,31 @@ public abstract class Entity<E extends Entity<E>>
{
if ( ! this.isLive()) return;
//System.out.println(this.getColl().getName() + ": " +this.getId() + " was modified locally");
// UNKNOWN is very unimportant really.
// LOCAL_ATTACH is for example much more important and should not be replaced.
if ( ! coll.identifiedModifications.containsKey(id))
{
coll.identifiedModifications.put(id, Modification.UNKNOWN);
}
this.getColl().putIdentifiedModificationFixed(this.getId(), Modification.UNKNOWN);
}
public Modification sync()
{
String id = this.getId();
if (id == null) return Modification.UNKNOWN;
return this.getColl().syncId(id);
if ( ! this.isLive()) return Modification.UNKNOWN;
return this.getColl().syncIdFixed(id);
}
public void saveToRemote()
{
String id = this.getId();
if (id == null) return;
if ( ! this.isLive()) return;
this.getColl().saveToRemote(id);
this.getColl().saveToRemoteFixed(id);
}
public void loadFromRemote()
{
String id = this.getId();
if (id == null) return;
if ( ! this.isLive()) return;
this.getColl().loadFromRemote(id, null);
this.getColl().loadFromRemoteFixed(id, null);
}
// -------------------------------------------- //

View File

@ -0,0 +1,32 @@
package com.massivecraft.massivecore.store;
import com.massivecraft.massivecore.xlib.gson.JsonElement;
//TODO: Merge with entity
public class EntityMeta
{
// -------------------------------------------- //
// CONSTANTS
// -------------------------------------------- //
public static final JsonElement DEFAULT_LAST_RAW = null;
public static final long DEFAULT_MTIME = 0;
public static final boolean DEFAULT_DEFAULT = false;
// -------------------------------------------- //
// FIELDS
// -------------------------------------------- //
private volatile JsonElement lastRaw = DEFAULT_LAST_RAW;
public JsonElement getLastRaw() { return this.lastRaw; }
public void setLastRaw(JsonElement lastRaw) { this.lastRaw = lastRaw; }
private volatile long mtime = DEFAULT_MTIME;
public long getMtime() { return this.mtime; }
public void setMtime(long mtime) { this.mtime = mtime; }
private volatile boolean isDefault = DEFAULT_DEFAULT;
public boolean isDefault() { return this.isDefault; }
public void setDefault(boolean isDefault) { this.isDefault = isDefault; }
}

View File

@ -1,68 +0,0 @@
package com.massivecraft.massivecore.store;
public class ExamineThread extends Thread
{
// -------------------------------------------- //
// INSTANCE
// -------------------------------------------- //
private static ExamineThread i = null;
public static ExamineThread get()
{
if (i == null || !i.isAlive()) i = new ExamineThread();
return i;
}
// -------------------------------------------- //
// CONSTRUCT
// -------------------------------------------- //
public ExamineThread()
{
this.setName("MStore ExamineThread");
}
// -------------------------------------------- //
// FIELDS
// -------------------------------------------- //
private long lastDurationMillis = 0;
public long getLastDurationMillis() { return this.lastDurationMillis; }
// -------------------------------------------- //
// OVERRIDE
// -------------------------------------------- //
@Override
public void run()
{
while (true)
{
try
{
long before = System.currentTimeMillis();
for (Coll<?> coll : Coll.getInstances())
{
coll.identifyModifications();
}
long after = System.currentTimeMillis();
long duration = after-before;
this.lastDurationMillis = duration;
//String message = Txt.parse("<i>ExamineThread iteration took <h>%dms<i>.", after-before);
//MassiveCore.get().log(message);
Thread.sleep(5000);
}
catch (InterruptedException e)
{
// We've been interrupted. Lets bail.
return;
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
}

View File

@ -11,6 +11,7 @@ import com.massivecraft.massivecore.xlib.gson.JsonPrimitive;
public class GsonCloner
{
// All
public static JsonElement clone(JsonElement element)
{
// null
@ -20,39 +21,48 @@ public class GsonCloner
if (element.isJsonNull()) return JsonNull.INSTANCE;
// JsonPrimitive
if (element.isJsonPrimitive())
{
JsonPrimitive primitive = element.getAsJsonPrimitive();
if (primitive.isBoolean()) return new JsonPrimitive(primitive.getAsBoolean());
if (primitive.isString()) return new JsonPrimitive(primitive.getAsString());
if (primitive.isNumber()) return new JsonPrimitive(primitive.getAsNumber());
throw new UnsupportedOperationException("The json primitive: " + primitive + " was not a boolean, number or string");
}
if (element.isJsonPrimitive()) return cloneJsonPrimitive(element.getAsJsonPrimitive());
// JsonObject
if (element.isJsonObject())
{
JsonObject ret = new JsonObject();
for (Entry<String, JsonElement> entry : ((JsonObject)element).entrySet())
{
ret.add(entry.getKey(), clone(entry.getValue()));
}
return ret;
}
if (element.isJsonObject()) return cloneJsonObject(element.getAsJsonObject());
// JsonArray
if (element.isJsonArray())
{
JsonArray ret = new JsonArray();
Iterator<JsonElement> iter = ((JsonArray)element).iterator();
while (iter.hasNext())
{
ret.add(clone(iter.next()));
}
return ret;
}
if (element.isJsonArray()) return cloneJsonArray(element.getAsJsonArray());
// Unknown
throw new RuntimeException("Unknown JsonElement class: " + element.getClass().getName());
}
// Primitive
public static JsonPrimitive cloneJsonPrimitive(JsonPrimitive primitive)
{
if (primitive.isBoolean()) return new JsonPrimitive(primitive.getAsBoolean());
if (primitive.isString()) return new JsonPrimitive(primitive.getAsString());
if (primitive.isNumber()) return new JsonPrimitive(primitive.getAsNumber());
throw new UnsupportedOperationException("The json primitive: " + primitive + " was not a boolean, number or string");
}
// Object
public static JsonObject cloneJsonObject(JsonObject object)
{
JsonObject ret = new JsonObject();
for (Entry<String, JsonElement> entry : object.entrySet())
{
ret.add(entry.getKey(), clone(entry.getValue()));
}
return ret;
}
// Array
public static JsonArray cloneJsonArray(JsonArray array)
{
JsonArray ret = new JsonArray();
for (Iterator<JsonElement> iter = array.iterator(); iter.hasNext();)
{
ret.add(clone(iter.next()));
}
return ret;
}
}

View File

@ -9,7 +9,7 @@ public class JsonFileFilter implements FileFilter
// CONSTANTS
// -------------------------------------------- //
private final static String DOTJSON = ".json";
public final static String DOTJSON = ".json";
// -------------------------------------------- //
// INSTANCE & CONSTRUCT

View File

@ -11,6 +11,12 @@ import com.massivecraft.massivecore.xlib.gson.JsonElement;
public class MStore
{
// -------------------------------------------- //
// CONSTANTS
// -------------------------------------------- //
public static boolean DEBUG_ENABLED = false;
// -------------------------------------------- //
// DRIVER REGISTRY
// -------------------------------------------- //
@ -103,5 +109,4 @@ public class MStore
return driver.getDb(uri.toString());
}
}
}

View File

@ -2,20 +2,27 @@ package com.massivecraft.massivecore.store;
public enum Modification
{
// -------------------------------------------- //
// ENUM
// -------------------------------------------- //
LOCAL_ALTER (true, true),
LOCAL_ATTACH (true, true),
LOCAL_DETACH (true, true),
REMOTE_ALTER (true, false),
REMOTE_ATTACH (true, false),
REMOTE_DETACH (true, false),
NONE (false, false),
UNKNOWN (false, false),
LOCAL_ALTER (true, 3),
LOCAL_ATTACH (true, 7),
LOCAL_DETACH (true, 8),
REMOTE_ALTER (true, 4),
REMOTE_ATTACH (true, 5),
REMOTE_DETACH (true, 6),
NONE (false, 1),
UNKNOWN (false, 2),
;
// -------------------------------------------- //
// CONSTANTS
// -------------------------------------------- //
public static final int TOP_PRIORITY = 7;
// -------------------------------------------- //
// FIELDS
// -------------------------------------------- //
@ -23,18 +30,49 @@ public enum Modification
private final boolean modified;
public boolean isModified() { return this.modified; }
private final boolean local;
public boolean isLocal() { return this.local; }
public boolean isRemote() { return this.local == false; }
private final int priority;
public int getPriority() { return this.priority; }
// -------------------------------------------- //
// CONSTRUCT
// -------------------------------------------- //
private Modification(boolean modified, boolean local)
private Modification(boolean modified, int priority)
{
this.modified = modified;
this.local = local;
this.priority = priority;
}
// -------------------------------------------- //
// LOCAL VS REMOTE
// -------------------------------------------- //
public boolean isLocal()
{
return this == LOCAL_ALTER || this == LOCAL_ATTACH || this == LOCAL_DETACH;
}
public boolean isRemote()
{
return this == REMOTE_ALTER || this == REMOTE_ATTACH || this == REMOTE_DETACH;
}
// -------------------------------------------- //
// SAFE
// -------------------------------------------- //
// If a modification state is safe,
// that means that you can always be sure that when it is saved
// in identifiedModifications, it ensured to actually be that.
public boolean isSafe()
{
return this == LOCAL_ATTACH || this == LOCAL_DETACH;
}
// Local Attach and Detach has the top priority.
// Otherwise newly attached entities would be removed thinking it was a remote detach.
// Otherwise newly detached entities would be loaded thinking it was a remote attach.
public boolean hasTopPriority()
{
return this.getPriority() >= TOP_PRIORITY;
}
}

View File

@ -0,0 +1,77 @@
package com.massivecraft.massivecore.store;
public abstract class ModificationPollerAbstract extends Thread
{
// -------------------------------------------- //
// CONSTRUCT
// -------------------------------------------- //
public ModificationPollerAbstract()
{
this.setName("MStore " + this.getClass().getSimpleName());
}
// -------------------------------------------- //
// FIELDS
// -------------------------------------------- //
private long iterationCount = 1;
// -------------------------------------------- //
// OVERRIDE
// -------------------------------------------- //
@Override
public void run()
{
while (true)
{
try
{
//System.out.println("Polling locally: " + MassiveCoreMConf.get().millisBetweenLocalPoll);
this.identify();
iterationCount++;
//String message = Txt.parse("<i>LocalModificationThread iteration took <h>%dms<i>.", after-before);
//MassiveCore.get().log(message);
Thread.sleep(this.getMillisBetweenPoll());
}
catch (InterruptedException e)
{
// We've been interrupted. Lets bail.
return;
}
catch (Exception e)
{
System.out.println("Poller error for" + this.getName());
e.printStackTrace();
}
}
}
// -------------------------------------------- //
// CORE
// -------------------------------------------- //
public void identify() throws InterruptedException
{
final long waitEfterColl = this.getMillisBetweenPollColl();
for (Coll<?> coll : Coll.getInstances())
{
if (this.poll(coll, this.iterationCount))
{
Thread.sleep(waitEfterColl);
}
}
}
// -------------------------------------------- //
// ABSTRACT
// -------------------------------------------- //
public abstract long getMillisBetweenPoll();
public abstract long getMillisBetweenPollColl();
public abstract boolean poll(Coll<?> coll, long iterationCount);
}

View File

@ -0,0 +1,45 @@
package com.massivecraft.massivecore.store;
import com.massivecraft.massivecore.MassiveCoreMConf;
public class ModificationPollerLocal extends ModificationPollerAbstract
{
// -------------------------------------------- //
// INSTANCE
// -------------------------------------------- //
private static ModificationPollerLocal i = null;
public static ModificationPollerLocal get()
{
if (i == null || !i.isAlive()) i = new ModificationPollerLocal();
return i;
}
// -------------------------------------------- //
// OVERRIDE
// -------------------------------------------- //
@Override
public long getMillisBetweenPoll()
{
return MassiveCoreMConf.get().millisBetweenLocalPoll;
}
@Override
public long getMillisBetweenPollColl()
{
return MassiveCoreMConf.get().millisBetweenLocalPollColl;
}
@Override
public boolean poll(Coll<?> coll, long iterationCount)
{
if (iterationCount % coll.getLocalPollFrequency() == 0)
{
coll.identifyLocalModifications(false);
return true;
}
return false;
}
}

View File

@ -0,0 +1,43 @@
package com.massivecraft.massivecore.store;
import com.massivecraft.massivecore.MassiveCoreMConf;
public class ModificationPollerRemote extends ModificationPollerAbstract
{
// -------------------------------------------- //
// INSTANCE
// -------------------------------------------- //
private static ModificationPollerRemote i = null;
public static ModificationPollerRemote get()
{
if (i == null || !i.isAlive()) i = new ModificationPollerRemote();
return i;
}
// -------------------------------------------- //
// OVERRIDE
// -------------------------------------------- //
@Override
public long getMillisBetweenPoll()
{
return MassiveCoreMConf.get().millisBetweenRemotePoll;
}
@Override
public long getMillisBetweenPollColl()
{
return MassiveCoreMConf.get().millisBetweenRemotePollColl;
}
@Override
public boolean poll(Coll<?> coll, long iterationCount)
{
//TODO: This could probably be true.
coll.identifyRemoteModifications(false);
return true;
}
}

View File

@ -0,0 +1,7 @@
package com.massivecraft.massivecore.store;
public interface PusherColl
{
public void init();
public void deinit();
}

View File

@ -0,0 +1,227 @@
package com.massivecraft.massivecore.store;
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import com.massivecraft.massivecore.MassiveCore;
public class PusherCollFlatfile extends Thread implements PusherColl
{
// -------------------------------------------- //
// CONSTANTS
// -------------------------------------------- //
public static final Kind<?>[] EVENT_TYPES = {StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY};
// -------------------------------------------- //
// FIELDS
// -------------------------------------------- //
private final String folderUri;
private final WatchService watcher;
private final Map<WatchKey, Path> keys;
private final Coll<?> coll;
private final Set<String> handledIds = new HashSet<String>();
// -------------------------------------------- //
// OVERRIDE: THREAD
// -------------------------------------------- //
@SuppressWarnings("unchecked")
public void run()
{
//MassiveCore.get().log("Starting Pusher for " + coll.getBasename());
while (true)
{
try
{
WatchKey key = this.watcher.take();
Path dir = this.keys.get(key);
if (dir == null)
{
//System.err.println("WatchKey not recognized!!");
continue;
}
for (WatchEvent<?> event : key.pollEvents())
{
handleEvent((WatchEvent<Path>) event, dir);
}
boolean valid = key.reset();
if ( ! valid) this.keys.remove(key);
}
catch (InterruptedException e)
{
// We've been interrupted. Lets bail.
MassiveCore.get().log("Stopping Pusher for " + this.coll.getDebugName());
return;
}
catch (Exception e)
{
System.out.println("Pusher error for" + this.coll.getDebugName());
e.printStackTrace();
}
finally
{
handledIds.clear();
}
}
}
public void handleEvent(WatchEvent<Path> event, Path root) throws IOException
{
// Note on my computer running OSX El Capitan
// updates is sent every 10 seconds.
// So the information /could/ be a little less than 10 seconds old.
// Thus we cannot trust the kind of the modification event.
// But only trust that the file has been modified in some way.
// Context for directory entry event is the file name of entry
Path context = event.context();
Path fullPath = root.resolve(context);
File file = fullPath.toFile();
long mtime = file.lastModified();
// Id
String id = context.toString();
if ( ! isIdOk(id)) return;
id = id.substring(0, id.length() - JsonFileFilter.DOTJSON.length());
// Most registered modifications here will actually be something done locally.
// So most of the time we should just ignore this.
Modification mod = this.coll.examineIdFixed(id, mtime);
//System.out.println("old: " + coll.getMetaCreative(id).getMtime() + "new: " + mtime);
//System.out.println(String.format("Coll %s found %s on %s", this.coll.getBasename(), mod, id));
// At LOCAL_ATTACH we get NONE.
// At LOCAL_ALTER we get NONE.
// At LOCAL_DETACH we get UNKNOWN.
// At isDefault we get NONE.
// At REMOTE_DETACH we get REMOTE_DETACH.
// At REMOTE_ATTACH we get REMOTE_ATTACH (?)
// At REMOTE_ALTER we get REMOTE_ALTER.
switch(mod)
{
// It was modified locally.
case NONE:
case UNKNOWN:
return;
// It was modified remotely.
case REMOTE_ATTACH:
case REMOTE_DETACH:
case REMOTE_ALTER:
// Usually we don't use the results of an async examination
// because the entity might be modified locally during the examination.
// But now we only care about remote modifications. Which should be accurate.
// However from examination to synchronization there can go a whole tick.
// And if a remote detach occurs during that tick, the system would break if we said the modification was alter or attach.
// Thus we cannot be sure about the modification, when we get to the synchronization point.
coll.putIdentifiedModificationFixed(id, Modification.UNKNOWN);
break;
// Should not happen
case LOCAL_ALTER:
case LOCAL_ATTACH:
case LOCAL_DETACH:
break;
}
}
public boolean isIdOk(String id)
{
// Special files made by the OS and some programs starts with '.'
if (id.charAt(0) == '.') return false;
// It must be a json file
if ( ! id.endsWith(JsonFileFilter.DOTJSON)) return false;
// If adding this id DIDN'T have an effect.
// It was already there and already handled.
if ( ! handledIds.add(id)) return false;
return true;
}
// -------------------------------------------- //
// OVERRIDE: PUSHER
// -------------------------------------------- //
@Override
public void init()
{
this.start();
}
@Override
public void deinit()
{
this.interrupt();
}
// -------------------------------------------- //
// REGISTER
// -------------------------------------------- //
public void register(Path path) throws IOException
{
if (Files.notExists(path)) throw new IllegalArgumentException(path.toString() + " does not exist.");
WatchKey key = path.register(watcher, EVENT_TYPES);
//System.out.format("register: %s\n", path);
keys.put(key, path);
}
public void registerAll(final Path start) throws IOException
{
// register directory and sub-directories
Files.walkFileTree(start, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException
{
register(dir);
return FileVisitResult.CONTINUE;
}
});
}
// -------------------------------------------- //
// CONSTRUCT
// -------------------------------------------- //
public PusherCollFlatfile(Coll<?> coll) throws IOException
{
Db db = coll.getDb();
if ( ! (db instanceof DbFlatfile)) throw new IllegalArgumentException("Coll doesn't use flatfile database");
this.folderUri = db.getDbName() + "/" + coll.getBasename();
this.watcher = FileSystems.getDefault().newWatchService();
this.keys = new HashMap<WatchKey, Path>();
this.coll = coll;
// We must make sure that the paths exists,
// otherwise we cannot register to listen for changes.
// So now we'll have some empty directories.
Path path = Paths.get(this.folderUri);
path.toFile().mkdirs();
this.register(path);
}
}