Compare commits
	
		
			2 commits
		
	
	
		
			8ade04e48d
			...
			c37fe88b35
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| c37fe88b35 | |||
| 42af6afb86 | 
					 4 changed files with 30 additions and 21 deletions
				
			
		| 
						 | 
					@ -20,7 +20,7 @@ main = do
 | 
				
			||||||
    -- ToDo: load persisted caches, bootstrapping nodes …
 | 
					    -- ToDo: load persisted caches, bootstrapping nodes …
 | 
				
			||||||
    (serverSock, thisNode) <- fediChordInit conf
 | 
					    (serverSock, thisNode) <- fediChordInit conf
 | 
				
			||||||
    -- currently no masking is necessary, as there is nothing to clean up
 | 
					    -- currently no masking is necessary, as there is nothing to clean up
 | 
				
			||||||
    cacheWriterThread <- forkIO $ cacheWriter thisNode
 | 
					    nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
 | 
				
			||||||
    -- try joining the DHT using one of the provided bootstrapping nodes
 | 
					    -- try joining the DHT using one of the provided bootstrapping nodes
 | 
				
			||||||
    joinedState <- tryBootstrapJoining thisNode
 | 
					    joinedState <- tryBootstrapJoining thisNode
 | 
				
			||||||
    either (\err -> do
 | 
					    either (\err -> do
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -47,7 +47,7 @@ module Hash2Pub.FediChord (
 | 
				
			||||||
  , mkServerSocket
 | 
					  , mkServerSocket
 | 
				
			||||||
  , mkSendSocket
 | 
					  , mkSendSocket
 | 
				
			||||||
  , resolve
 | 
					  , resolve
 | 
				
			||||||
  , cacheWriter
 | 
					  , nodeCacheWriter
 | 
				
			||||||
  , joinOnNewEntriesThread
 | 
					  , joinOnNewEntriesThread
 | 
				
			||||||
                           ) where
 | 
					                           ) where
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -94,10 +94,12 @@ import           Debug.Trace                   (trace)
 | 
				
			||||||
-- ToDo: load persisted state, thus this function already operates in IO
 | 
					-- ToDo: load persisted state, thus this function already operates in IO
 | 
				
			||||||
fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM)
 | 
					fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM)
 | 
				
			||||||
fediChordInit initConf = do
 | 
					fediChordInit initConf = do
 | 
				
			||||||
 | 
					    emptyLookupCache <- newTVarIO Map.empty
 | 
				
			||||||
    let realNode = RealNode {
 | 
					    let realNode = RealNode {
 | 
				
			||||||
            vservers = []
 | 
					            vservers = []
 | 
				
			||||||
          , nodeConfig = initConf
 | 
					          , nodeConfig = initConf
 | 
				
			||||||
          , bootstrapNodes = confBootstrapNodes initConf
 | 
					          , bootstrapNodes = confBootstrapNodes initConf
 | 
				
			||||||
 | 
					          , lookupCacheSTM = emptyLookupCache
 | 
				
			||||||
                            }
 | 
					                            }
 | 
				
			||||||
    realNodeSTM <- newTVarIO realNode
 | 
					    realNodeSTM <- newTVarIO realNode
 | 
				
			||||||
    initialState <- nodeStateInit realNodeSTM
 | 
					    initialState <- nodeStateInit realNodeSTM
 | 
				
			||||||
| 
						 | 
					@ -283,8 +285,8 @@ joinOnNewEntriesThread nsSTM = loop
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | cache updater thread that waits for incoming NodeCache update instructions on
 | 
					-- | cache updater thread that waits for incoming NodeCache update instructions on
 | 
				
			||||||
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
 | 
					-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
 | 
				
			||||||
cacheWriter :: LocalNodeStateSTM -> IO ()
 | 
					nodeCacheWriter :: LocalNodeStateSTM -> IO ()
 | 
				
			||||||
cacheWriter nsSTM =
 | 
					nodeCacheWriter nsSTM =
 | 
				
			||||||
    forever $ atomically $ do
 | 
					    forever $ atomically $ do
 | 
				
			||||||
        ns <- readTVar nsSTM
 | 
					        ns <- readTVar nsSTM
 | 
				
			||||||
        cacheModifier <- readTQueue $ cacheWriteQueue ns
 | 
					        cacheModifier <- readTQueue $ cacheWriteQueue ns
 | 
				
			||||||
| 
						 | 
					@ -297,8 +299,8 @@ maxEntryAge = 600
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
 | 
					-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
 | 
				
			||||||
cacheVerifyThread :: LocalNodeStateSTM -> IO ()
 | 
					nodeCacheVerifyThread :: LocalNodeStateSTM -> IO ()
 | 
				
			||||||
cacheVerifyThread nsSTM = forever $ do
 | 
					nodeCacheVerifyThread nsSTM = forever $ do
 | 
				
			||||||
    putStrLn "cache verify run: begin"
 | 
					    putStrLn "cache verify run: begin"
 | 
				
			||||||
    -- get cache
 | 
					    -- get cache
 | 
				
			||||||
    (ns, cache) <- atomically $ do
 | 
					    (ns, cache) <- atomically $ do
 | 
				
			||||||
| 
						 | 
					@ -308,7 +310,7 @@ cacheVerifyThread nsSTM = forever $ do
 | 
				
			||||||
    -- iterate entries:
 | 
					    -- iterate entries:
 | 
				
			||||||
    -- for avoiding too many time syscalls, get current time before iterating.
 | 
					    -- for avoiding too many time syscalls, get current time before iterating.
 | 
				
			||||||
    now <- getPOSIXTime
 | 
					    now <- getPOSIXTime
 | 
				
			||||||
    forM_ (cacheEntries cache) (\(CacheEntry validated node ts) ->
 | 
					    forM_ (nodeCacheEntries cache) (\(CacheEntry validated node ts) ->
 | 
				
			||||||
        -- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
 | 
					        -- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
 | 
				
			||||||
        if (now - ts) > maxEntryAge
 | 
					        if (now - ts) > maxEntryAge
 | 
				
			||||||
           then
 | 
					           then
 | 
				
			||||||
| 
						 | 
					@ -541,7 +543,7 @@ fediMainThreads sock nsSTM = do
 | 
				
			||||||
    concurrently_
 | 
					    concurrently_
 | 
				
			||||||
        (fediMessageHandler sendQ recvQ nsSTM) $
 | 
					        (fediMessageHandler sendQ recvQ nsSTM) $
 | 
				
			||||||
        concurrently_ (stabiliseThread nsSTM) $
 | 
					        concurrently_ (stabiliseThread nsSTM) $
 | 
				
			||||||
            concurrently_ (cacheVerifyThread nsSTM) $
 | 
					            concurrently_ (nodeCacheVerifyThread nsSTM) $
 | 
				
			||||||
                concurrently_ (convergenceSampleThread nsSTM) $
 | 
					                concurrently_ (convergenceSampleThread nsSTM) $
 | 
				
			||||||
                    concurrently_
 | 
					                    concurrently_
 | 
				
			||||||
                        (sendThread sock sendQ)
 | 
					                        (sendThread sock sendQ)
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -18,6 +18,7 @@ module Hash2Pub.FediChordTypes (
 | 
				
			||||||
  , setSuccessors
 | 
					  , setSuccessors
 | 
				
			||||||
  , setPredecessors
 | 
					  , setPredecessors
 | 
				
			||||||
  , NodeCache
 | 
					  , NodeCache
 | 
				
			||||||
 | 
					  , NodeCacheEntry
 | 
				
			||||||
  , CacheEntry(..)
 | 
					  , CacheEntry(..)
 | 
				
			||||||
  , RingEntry(..)
 | 
					  , RingEntry(..)
 | 
				
			||||||
  , RingMap(..)
 | 
					  , RingMap(..)
 | 
				
			||||||
| 
						 | 
					@ -39,7 +40,7 @@ module Hash2Pub.FediChordTypes (
 | 
				
			||||||
  , rMapToList
 | 
					  , rMapToList
 | 
				
			||||||
  , cacheGetNodeStateUnvalidated
 | 
					  , cacheGetNodeStateUnvalidated
 | 
				
			||||||
  , initCache
 | 
					  , initCache
 | 
				
			||||||
  , cacheEntries
 | 
					  , nodeCacheEntries
 | 
				
			||||||
  , cacheLookup
 | 
					  , cacheLookup
 | 
				
			||||||
  , cacheLookupSucc
 | 
					  , cacheLookupSucc
 | 
				
			||||||
  , cacheLookupPred
 | 
					  , cacheLookupPred
 | 
				
			||||||
| 
						 | 
					@ -144,6 +145,8 @@ data RealNode = RealNode
 | 
				
			||||||
    -- ^ holds the initial configuration read at program start
 | 
					    -- ^ holds the initial configuration read at program start
 | 
				
			||||||
    , bootstrapNodes :: [(String, PortNumber)]
 | 
					    , bootstrapNodes :: [(String, PortNumber)]
 | 
				
			||||||
    -- ^ nodes to be used as bootstrapping points, new ones learned during operation
 | 
					    -- ^ nodes to be used as bootstrapping points, new ones learned during operation
 | 
				
			||||||
 | 
					    , lookupCacheSTM :: TVar LookupCache
 | 
				
			||||||
 | 
					    -- ^ a global cache of looked up keys and their associated nodes
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type RealNodeSTM = TVar RealNode
 | 
					type RealNodeSTM = TVar RealNode
 | 
				
			||||||
| 
						 | 
					@ -284,13 +287,17 @@ class (Eq a, Show a) => HasKeyID a where
 | 
				
			||||||
instance HasKeyID RemoteNodeState where
 | 
					instance HasKeyID RemoteNodeState where
 | 
				
			||||||
    getKeyID = getNid
 | 
					    getKeyID = getNid
 | 
				
			||||||
 | 
					
 | 
				
			||||||
instance HasKeyID CacheEntry where
 | 
					instance HasKeyID a => HasKeyID (CacheEntry a) where
 | 
				
			||||||
    getKeyID (CacheEntry _ ns _) = getNid ns
 | 
					    getKeyID (CacheEntry _ obj _) = getKeyID obj
 | 
				
			||||||
 | 
					
 | 
				
			||||||
instance HasKeyID NodeID where
 | 
					instance HasKeyID NodeID where
 | 
				
			||||||
    getKeyID = id
 | 
					    getKeyID = id
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type NodeCache = RingMap CacheEntry
 | 
					type NodeCacheEntry = CacheEntry RemoteNodeState
 | 
				
			||||||
 | 
					type NodeCache = RingMap NodeCacheEntry
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type LookupCacheEntry = CacheEntry (String, PortNumber)
 | 
				
			||||||
 | 
					type LookupCache = Map.Map NodeID LookupCacheEntry
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | generic data structure for holding elements with a key and modular lookup
 | 
					-- | generic data structure for holding elements with a key and modular lookup
 | 
				
			||||||
newtype RingMap a = RingMap { getRingMap :: HasKeyID a => Map.Map NodeID (RingEntry a) }
 | 
					newtype RingMap a = RingMap { getRingMap :: HasKeyID a => Map.Map NodeID (RingEntry a) }
 | 
				
			||||||
| 
						 | 
					@ -308,7 +315,7 @@ data RingEntry a = KeyEntry a
 | 
				
			||||||
    deriving (Show, Eq)
 | 
					    deriving (Show, Eq)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | 'RingEntry' type for usage as a node cache
 | 
					-- | 'RingEntry' type for usage as a node cache
 | 
				
			||||||
data CacheEntry = CacheEntry Bool RemoteNodeState POSIXTime
 | 
					data CacheEntry a = CacheEntry Bool a POSIXTime
 | 
				
			||||||
    deriving (Show, Eq)
 | 
					    deriving (Show, Eq)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
| 
						 | 
					@ -338,8 +345,8 @@ extractRingEntry (ProxyEntry _ (Just (KeyEntry entry))) = Just entry
 | 
				
			||||||
extractRingEntry _                                      = Nothing
 | 
					extractRingEntry _                                      = Nothing
 | 
				
			||||||
 | 
					
 | 
				
			||||||
--- useful function for getting entries for a full cache transfer
 | 
					--- useful function for getting entries for a full cache transfer
 | 
				
			||||||
cacheEntries :: NodeCache -> [CacheEntry]
 | 
					nodeCacheEntries :: NodeCache -> [NodeCacheEntry]
 | 
				
			||||||
cacheEntries = mapMaybe extractRingEntry . Map.elems . getRingMap
 | 
					nodeCacheEntries = mapMaybe extractRingEntry . Map.elems . getRingMap
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | An empty 'RingMap' needs to be initialised with 2 proxy entries,
 | 
					-- | An empty 'RingMap' needs to be initialised with 2 proxy entries,
 | 
				
			||||||
-- linking the modular name space together by connecting @minBound@ and @maxBound@
 | 
					-- linking the modular name space together by connecting @minBound@ and @maxBound@
 | 
				
			||||||
| 
						 | 
					@ -360,7 +367,7 @@ rMapLookup key rmap =  extractRingEntry =<< Map.lookup key (getRingMap rmap)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
cacheLookup :: NodeID       -- ^lookup key
 | 
					cacheLookup :: NodeID       -- ^lookup key
 | 
				
			||||||
            -> NodeCache    -- ^lookup cache
 | 
					            -> NodeCache    -- ^lookup cache
 | 
				
			||||||
            -> Maybe CacheEntry
 | 
					            -> Maybe NodeCacheEntry
 | 
				
			||||||
cacheLookup = rMapLookup
 | 
					cacheLookup = rMapLookup
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | returns number of present 'KeyEntry' in a properly initialised 'RingMap'
 | 
					-- | returns number of present 'KeyEntry' in a properly initialised 'RingMap'
 | 
				
			||||||
| 
						 | 
					@ -419,7 +426,7 @@ rMapLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards
 | 
				
			||||||
 | 
					
 | 
				
			||||||
cacheLookupSucc :: NodeID           -- ^lookup key
 | 
					cacheLookupSucc :: NodeID           -- ^lookup key
 | 
				
			||||||
                -> NodeCache        -- ^ring cache
 | 
					                -> NodeCache        -- ^ring cache
 | 
				
			||||||
                -> Maybe CacheEntry
 | 
					                -> Maybe NodeCacheEntry
 | 
				
			||||||
cacheLookupSucc = rMapLookupSucc
 | 
					cacheLookupSucc = rMapLookupSucc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | find the predecessor node to a given key on a modular EpiChord ring.
 | 
					-- | find the predecessor node to a given key on a modular EpiChord ring.
 | 
				
			||||||
| 
						 | 
					@ -431,7 +438,7 @@ rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
 | 
				
			||||||
 | 
					
 | 
				
			||||||
cacheLookupPred :: NodeID           -- ^lookup key
 | 
					cacheLookupPred :: NodeID           -- ^lookup key
 | 
				
			||||||
                -> NodeCache        -- ^ring cache
 | 
					                -> NodeCache        -- ^ring cache
 | 
				
			||||||
                -> Maybe CacheEntry
 | 
					                -> Maybe NodeCacheEntry
 | 
				
			||||||
cacheLookupPred = rMapLookupPred
 | 
					cacheLookupPred = rMapLookupPred
 | 
				
			||||||
 | 
					
 | 
				
			||||||
addRMapEntryWith :: HasKeyID a
 | 
					addRMapEntryWith :: HasKeyID a
 | 
				
			||||||
| 
						 | 
					@ -522,7 +529,7 @@ takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc
 | 
				
			||||||
-- transfer difference now - entry to other node
 | 
					-- transfer difference now - entry to other node
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | return the @NodeState@ data from a cache entry without checking its validation status
 | 
					-- | return the @NodeState@ data from a cache entry without checking its validation status
 | 
				
			||||||
cacheGetNodeStateUnvalidated :: CacheEntry -> RemoteNodeState
 | 
					cacheGetNodeStateUnvalidated :: CacheEntry RemoteNodeState -> RemoteNodeState
 | 
				
			||||||
cacheGetNodeStateUnvalidated (CacheEntry _ nState _) = nState
 | 
					cacheGetNodeStateUnvalidated (CacheEntry _ nState _) = nState
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | converts a 'HostAddress6' IP address to a big-endian strict ByteString
 | 
					-- | converts a 'HostAddress6' IP address to a big-endian strict ByteString
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
| 
						 | 
					@ -89,12 +89,12 @@ data RemoteCacheEntry = RemoteCacheEntry RemoteNodeState POSIXTime
 | 
				
			||||||
instance Ord RemoteCacheEntry where
 | 
					instance Ord RemoteCacheEntry where
 | 
				
			||||||
    (RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2
 | 
					    (RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2
 | 
				
			||||||
 | 
					
 | 
				
			||||||
toRemoteCacheEntry :: CacheEntry -> RemoteCacheEntry
 | 
					toRemoteCacheEntry :: NodeCacheEntry -> RemoteCacheEntry
 | 
				
			||||||
toRemoteCacheEntry (CacheEntry _ ns ts) = RemoteCacheEntry ns ts
 | 
					toRemoteCacheEntry (CacheEntry _ ns ts) = RemoteCacheEntry ns ts
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | a list of all entries of a 'NodeCache' as 'RemoteCacheEntry', useful for cache transfers
 | 
					-- | a list of all entries of a 'NodeCache' as 'RemoteCacheEntry', useful for cache transfers
 | 
				
			||||||
toRemoteCache :: NodeCache -> [RemoteCacheEntry]
 | 
					toRemoteCache :: NodeCache -> [RemoteCacheEntry]
 | 
				
			||||||
toRemoteCache cache = toRemoteCacheEntry <$> cacheEntries cache
 | 
					toRemoteCache cache = toRemoteCacheEntry <$> nodeCacheEntries cache
 | 
				
			||||||
 | 
					
 | 
				
			||||||
-- | extract the 'NodeState' from a 'RemoteCacheEntry'
 | 
					-- | extract the 'NodeState' from a 'RemoteCacheEntry'
 | 
				
			||||||
remoteNode :: RemoteCacheEntry -> RemoteNodeState
 | 
					remoteNode :: RemoteCacheEntry -> RemoteNodeState
 | 
				
			||||||
| 
						 | 
					
 | 
				
			||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue