/* * Engine.java * * Created on 25 May 2006, 16:50 * * Bouillon2 project. Open control - controlled openness. * (c) Victor Grishchenko, http://oc-co.org */ package bouillon2.server; import bouillon2.server.token.*; import com.sleepycat.je.*; import com.sleepycat.persist.*; import java.util.*; import org.apache.log4j.Logger; /** * * @author gritzko */ public class Engine implements Destination { JID jid; Core core; EntityStore store; PrimaryIndex pieces; PrimaryIndex requests; PrimaryIndex opinions; PrimaryIndex contacts; PrimaryIndex bodies; PrimaryIndex body_requests; SecondaryIndex requestsByPOID; SecondaryIndex requestsByOVID; SecondaryIndex idByParentId; SecondaryIndex opinionsByPOID; SecondaryIndex opinionsByPOVID; SecondaryIndex bodyRequestsByPOVID; // FIXED: 4 out of 12 indexes are in use // NOTE: copies of the same strings are saved multiple times // in numerous BDB databases // FIXME: move pieces, bodies to the shared database Logger log; /** Creates a new instance of Engine */ public Engine(Core core, JID jid) throws DatabaseException { this.jid = jid; this.core = core; Environment env = core.getEnvironment(); StoreConfig storeConfig = new StoreConfig(); storeConfig.setAllowCreate(true); storeConfig.setReadOnly(false); storeConfig.setTransactional(false); //storeConfig.setTransactional(true); store = new EntityStore(env, jid.getUser(), storeConfig); pieces = core.store.getPrimaryIndex(String.class,Piece.class); idByParentId = core.store.getSecondaryIndex(pieces,String.class,"parent_id"); bodies = core.store.getPrimaryIndex(String.class,PieceToken.class); requests = store.getPrimaryIndex(String.class,OpinionRequest.class); opinions = store.getPrimaryIndex(String.class,OpinionToken.class); contacts = store.getPrimaryIndex(String.class,Contact.class); body_requests = store.getPrimaryIndex(String.class,PieceRequest.class); requestsByPOID = store.getSecondaryIndex(requests,String.class,"po_id"); requestsByOVID = store.getSecondaryIndex(requests,String.class,"ov_id"); opinionsByPOID = store.getSecondaryIndex(opinions,String.class,"po_id"); opinionsByPOVID = store.getSecondaryIndex(opinions,String.class,"pov_id_sc"); bodyRequestsByPOVID = store.getSecondaryIndex(body_requests,String.class,"pov_id"); log = Logger.getLogger("engine."+jid.getUser()); } // engine2engine private void respondBy (String po_id, OpinionRequest req, boolean req_refresh) throws DatabaseException { EntityIndex opinions_on_kid = opinionsByPOID.subIndex(po_id); EntityCursor opcur = opinions_on_kid.entities(); for ( OpinionToken o : opcur ) if (!isOwn(o)) { if (!o.isAged()) { //if (!o.getSender().equals(req.getSender())) UNCOMM&TEST if (!req_refresh) doResponse (req,o); } else { debug("aged op by "+o.getAuthor()); opcur.delete(); } } else { if (o.isFresh()) { if (!req_refresh) doResponse (req,o); } else core.routeToken(o); // opinion refresh } opcur.close(); } public void requestOpinions(OpinionRequest req) throws DatabaseException { boolean refresh = false; // refresh: need not to respond boolean eaten = false; // eaten by a previous request: need not to relay OpinionRequest prev = requests.get (req.getKey()); if (prev!=null && prev.getThreshold()<=req.getThreshold()) { if (!prev.isFresh()) { // aged requests can't be refreshed if (!prev.isAged()) { refresh = true; debug("refresh of "+prev); } } else { debug("double req"); return; } } EntityIndex similar_requests = requestsByPOID.subIndex(req.getPOID()); EntityCursor cursor = similar_requests.entities(); for (OpinionRequest ex : cursor) if (!ex.isAged()) { if ( ex.getThreshold()<=req.getThreshold() && !ex.getSender().equals(req.getSender()) ) { debug ("eaten by "+ex.getSender()+" of "+ (req.getTimeReceived()-ex.getTimeReceived())+"ms diff"); eaten = true; break; } } else { debug("aged req: "+req); cursor.delete(); } cursor.close(); // PUT request requests.put(req); if ( ! req.getPID().equals("/") ) { Piece piece = pieces.get(req.getPID()); if (piece==null) { debug ("request on kids of unknown parent: "+req.getPID()); return; } // respond EntityIndex kids = idByParentId.subIndex (req.getPID()); EntityCursor kids_cursor = kids.entities(); for ( Piece kid : kids_cursor ) { String poid = kid.getPOID(); this.respondBy(poid,req,refresh); } kids_cursor.close(); // relay if ( !eaten && req.getReverseDistance() > EngineParams.PIECE_REQ_RELAY_RD_THRESHOLD) { String piece_po = piece.getPOID(); EntityIndex opinions_on_piece = opinionsByPOID.subIndex(piece_po); EntityCursor popcur = opinions_on_piece.entities(); Set requested = new HashSet(); // hash set! for ( OpinionToken o : popcur ) { if (!requested.contains(o.getSender())) { requested.add (o.getSender()); doRequest (o,req); } } popcur.close(); } } else { // root req, e.g. for "/page", not "page/" // respond this.respondBy(req.getPOID(),req,refresh); // relay if ( !eaten && req.getReverseDistance() > EngineParams.PAGE_REQ_RELAY_RD_THRESHOLD) { EntityCursor k = contacts.entities(); for (Contact s : k) doRelay (s,req); k.close(); } } // future: use / opinions // *** if something doMaintanance(); } private void doMaintanance () throws DatabaseException { // 4 foreach *** EntityCursor req_cur = requests.entities(); for (OpinionRequest req : req_cur) if (req.isAged()) req_cur.delete(); req_cur.close(); } /** TODO: send requests in different directions */ public void requestPiece(PieceRequest req) throws DatabaseException { PieceToken body = bodies.get(req.getPOVID()); if (body!=null){ debug("has a body"); doResponse (req,body); return; } else { body_requests.put (req); // body request relaying EntityCursor opc = this.opinionsByPOVID.subIndex(req.getPOVID()).entities(); OpinionToken send_to = null; for (OpinionToken o : opc) { //log.debug("considering "+o); if ( send_to==null || (!o.getSender().equalEngine(req.getSender()) && send_to.getReverseDistance()send_to.getReverseDistance()*0.8) ) send_to = o; } opc.close(); if (send_to==null) { debug("have no idea whom to ask"); return; } else { debug("asking "+send_to.getSender()); doRequest (send_to,req); } } } public void updateReputations (OpinionToken eval) throws DatabaseException { EntityIndex other_opinions = this.opinionsByPOVID.subIndex(eval.getPOVID()); EntityCursor ooc = other_opinions.entities(); for (OpinionToken rec : ooc) { JID cnt = rec.getSender(); double base = rec.getReverseDistance(); // inaccurate; rev_dist was already // decreased by the contact's reputation double agreement; if (eval.getRelevance()<0.1 && rec.getRelevance()<0.1) agreement = 1.0; // originally, to avoid /0 else agreement = Math.min(eval.getRelevance(),rec.getRelevance()) / Math.max(eval.getRelevance(),rec.getRelevance()); Contact contact = contacts.get(cnt.getEngineJID().toString()); if (contact!=null) { contact.update (agreement,base); contacts.put (contact); } } ooc.close(); } public boolean isMine (JID ismine) { return ismine.getEngineJID().equals(this.jid); } public boolean isOwn (OpinionToken op) { return isMine(op.getAuthor()); } // TODO: check whether sender==self // TODO: obsolescence checks, cleaning public void acceptOpinion(OpinionToken token) throws DatabaseException { ID id = token.getID(); // opinion lifecycle OpinionToken prev = opinions.get(token.getKey()); if (prev!=null) { debug("has prev"); if ( prev.getSender().equals (token.getSender()) ) { // from the same contact if (prev.getRelevance()==token.getRelevance() && prev.isFresh()) {// double debug ("double on "+token.getPOVID()); return; // have a fresh copy, need not to process } } else { if ( isOwn(token) && !isMine(token.getSender()) ) { debug("got own opinion from a different sender"); return; // only my own agents may overwrite my opinions } debug(prev.getReverseDistance()+"\t"+token.getReverseDistance()+ "\t"+prev.isAged()); if (prev.getReverseDistance()>token.getReverseDistance() && !prev.isAged()) { debug ("weaker copy on "+token.getPOVID()); return; // weaker copy may overwrite aged tokens only } } } else // prev==null if ( isOwn(token) ) updateReputations (token); // PUT opinions.put(token); if (!token.getPID().equals("/")) { // "node/kid" Piece parent = pieces.get(token.getPID()); if (parent==null) { debug("dropping separate piece (parent unknown): " +id.toString()); return; } if (!pieces.contains(id.getOID())) { Piece new_root = new Piece(id); this.pieces.put(new_root); } // this opinion fits some requests as an answer EntityIndex requests_on_parent = requestsByPOID.subIndex(token.getPID()+"/"); EntityCursor ropcur = requests_on_parent.entities(); for (OpinionRequest req : ropcur) if (req.getThreshold() requests_on_subject = requestsByPOID.subIndex(token.getID().getOID()+"/"); EntityCursor roscur = requests_on_subject.entities(); for (OpinionRequest req : roscur) // FIXME *** if (req.getThreshold() requests_for_page = requestsByPOID.subIndex(token.getPOID()); EntityCursor rpcur = requests_for_page.entities(); for (OpinionRequest req : rpcur) if (req.getThreshold() EngineParams.PAGE_ADVERT_RELAY_RD_THRESHOLD) { EntityCursor cc = contacts.entities(); for (Contact s : cc) if (!token.getSender().equals(s.getJID())) doRelay (s,token); cc.close(); } // whether the contact became promising EntityIndex requests_on_subject = requestsByPOID.subIndex(token.getID().getOID()+"/"); EntityCursor roscur = requests_on_subject.entities(); for (OpinionRequest req : roscur) // FIXME *** if (req.getThreshold() requests_on_id = bodyRequestsByPOVID.subIndex(token.getPOVID()); EntityCursor ricur = requests_on_id.entities(); for (PieceRequest req : ricur) { doResponse (req,token); ricur.delete(); } ricur.close(); } public JID getJID () { return jid; } // communication private void doResponse (OpinionRequest req, OpinionToken response) throws DatabaseException { double rep_to = 1.0; if (req.getSender().isEngine()) { Contact cnt = contacts.get(req.getSender().toString()); if (cnt!=null) rep_to = cnt.getReputation(); } JID from = this.isMine(req.getSender()) ? response.getSender() : getJID(); OpinionToken further = new OpinionToken (response,from,req.getSender(),rep_to,1.0); core.routeToken(further); } private void doRequest (OpinionToken o, OpinionRequest request) throws DatabaseException { if (o.getSender().equals(this.jid)) return; if (o.getSender().isEngine()) { Contact cnt = contacts.get(o.getSender().toString()); if (cnt==null) return; double rep_to = cnt.getReputation(); OpinionRequest further = new OpinionRequest (request,getJID(),o.getSender(),rep_to,1.0); core.routeToken(further); } else debug("agent can't be requested; dropping"); } private void doResponse(PieceRequest req, PieceToken body) throws DatabaseException { double rep_to = 1.0; if (req.getSender().isEngine()) { Contact cnt = contacts.get(req.getSender().toString()); if (cnt!=null) rep_to = cnt.getReputation(); } PieceToken further = new PieceToken (body,getJID(),req.getSender(),rep_to,1.0); core.routeToken(further); } // etc static Engine initEngine(Core core, JID jid) throws DatabaseException { Engine new_eng = new Engine (core,jid); new_eng.pieces.put (new Piece("/")); return new_eng; } public void close () throws DatabaseException { this.store.close(); this.store = null; } public void addContact(JID jid) throws DatabaseException { if (!jid.isEngine()) throw new IllegalArgumentException("not an engine jid: "+jid); if (jid.getEngineJID().equals(this.jid)) throw new IllegalArgumentException("it's me: "+jid); if (contacts.get(jid.toString())!=null) { debug (this.getJID()+" already has contact with "+jid); } Contact bc = new Contact (jid); contacts.put(bc); debug("contact added to "+this.jid.getUser()+": "+jid); EntityCursor req_cur = this.requests.entities(); for (OpinionRequest req : req_cur) if (!req.isAged()) this.doRelay(bc,req); req_cur.close(); } private void doRelay(Contact cnt, OpinionRequest req) { if (cnt==null) { debug("relaying to null contact; dropping"); return; } double rep_to = cnt.getReputation(); OpinionRequest further = new OpinionRequest (req,getJID(),cnt.getJID(),rep_to,1.0); core.routeToken(further); } private void doRequest (OpinionToken op, PieceRequest req) throws DatabaseException { if (op.getSender().equals(this.jid)) return; if (!op.getSender().isEngine()) { debug ("only engines can be requested"); return; } String route_to = op.getSender().toString(); Contact cnt = contacts.get(route_to); if (cnt==null) { debug("requesting non-existent contact"+route_to+"; dropping"); return; } double rep_to = cnt.getReputation(); PieceRequest further = new PieceRequest (req,getJID(),op.getSender(),rep_to,1.0); core.routeToken(further); } private void doRelay(Contact cnt, OpinionToken op) { if (cnt==null) { debug("relaying to null contact; dropping"); return; } double rep_to = cnt.getReputation(); OpinionToken further = new OpinionToken (op,getJID(),cnt.getJID(),rep_to,1.0); core.routeToken(further); } public void routeToken(Token token) throws Exception { double bck_inc = 1.0; if (token.getSender().isEngine() && !token.getSender().equals(jid) ) { Contact cnt = contacts.get (token.getSender().toString()); if (cnt==null) { debug("JID unknown to "+jid.getUser()+ ": "+token.getSender()+"; dropping"); return; } bck_inc = cnt.getReputation(); } if (token instanceof OpinionToken) { if (bck_inc pending = new HashSet(); void addContactPending(JID jid) { pending.add(jid); } public void addContactIfPending (JID jid) throws DatabaseException { if (pending.contains(jid)) { pending.remove(jid); addContact(jid); } } private void debug (String s) { // FIXME: concatenation if (log!=null) log.debug (s); } public void removeContact(JID jid) throws DatabaseException { if (!jid.isEngine()) throw new IllegalArgumentException("not an engine jid: "+jid); if (contacts.get(jid.toString())==null) debug (this.getJID()+" has no contact with "+jid); contacts.delete(jid.toString()); debug("contact removed from "+this.jid.getUser()+": "+jid); } public Contact getContact (JID jid) throws DatabaseException { return contacts.get(jid.getEngineJID().toString()); } public boolean hasContact (JID jid) throws DatabaseException { return contacts.contains(jid.getEngineJID().toString()); } public Map getContacts() { return contacts.sortedMap(); } public void setManualReputation(JID jid, double rep) throws DatabaseException { Contact c = this.getContact(jid); c.setReputation(rep); c.setBase(Double.POSITIVE_INFINITY); contacts.put(c); } public void releaseManualReputation(JID jid) throws DatabaseException { Contact c = this.getContact(jid); c.setBase(Contact.DEFAULT_BASE); contacts.put(c); } } /*EntityIndex opinions_on_page = opinionsByPOID.subIndex(req.getPOID()); EntityCursor pagcur = opinions_on_page.entities(); for ( OpinionToken o : pagcur ) // TODO o.sender!=req.sender doResponse (req,o); pagcur.close();*/ /** O(N*logN) public void checkAging() throws DatabaseException { EntityCursor oreqc = requests.entities(); try { for (OpinionRequest req = oreqc.first(); req != null; req = oreqc.next(LockMode.READ_UNCOMMITTED)) if (req.isAged()) { oreqc.delete(); debug("aged req by "+req.getAuthor()+" on "+req.getPOVID()); } } finally { oreqc.close(); } EntityCursor otokc = opinions.entities(); try{ for (OpinionToken tok = otokc.first(); tok!=null; tok=otokc.next(LockMode.READ_UNCOMMITTED)) { if ( ! isOwn(tok) ) { if (tok.isAged()) otokc.delete(); // the opinion is no longer supported } else if (!tok.isFresh()) { // own opinion probably needs refreshing if ( tok.getPID().equals("/") ) { if ( this.requestsByPOID.contains(tok.getPOID()) || this.requestsByPOID.contains(tok.getID().getOID()+"/") ) this.acceptOpinion(tok.touched()); } else { if ( this.requestsByPOID.contains(tok.getPID()+"/") ) this.acceptOpinion(tok.touched()); // NOTE: reputations! } debug("refreshed "+tok.getPOVID()); } } } finally { otokc.close(); } EntityCursor treqc = body_requests.entities(); try { for (PieceRequest req = treqc.first(); req != null; req = treqc.next(LockMode.READ_UNCOMMITTED)) if (req.isAged()) treqc.delete(); } finally { treqc.close(); } /*EntityCursor ttokc = bodies.entities(); try{ for (PieceToken tok = ttokc.first(); tok!=null; tok=ttokc.next()) if (!tok.getSender().noResource().equals(jid) && tok.isAged()) ttokc.delete(); } finally { ttokc.close(); }* }*/