parent
8ade04e48d
commit
42af6afb86
|
@ -20,7 +20,7 @@ main = do
|
||||||
-- ToDo: load persisted caches, bootstrapping nodes …
|
-- ToDo: load persisted caches, bootstrapping nodes …
|
||||||
(serverSock, thisNode) <- fediChordInit conf
|
(serverSock, thisNode) <- fediChordInit conf
|
||||||
-- currently no masking is necessary, as there is nothing to clean up
|
-- currently no masking is necessary, as there is nothing to clean up
|
||||||
cacheWriterThread <- forkIO $ cacheWriter thisNode
|
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
|
||||||
-- try joining the DHT using one of the provided bootstrapping nodes
|
-- try joining the DHT using one of the provided bootstrapping nodes
|
||||||
joinedState <- tryBootstrapJoining thisNode
|
joinedState <- tryBootstrapJoining thisNode
|
||||||
either (\err -> do
|
either (\err -> do
|
||||||
|
|
|
@ -47,7 +47,7 @@ module Hash2Pub.FediChord (
|
||||||
, mkServerSocket
|
, mkServerSocket
|
||||||
, mkSendSocket
|
, mkSendSocket
|
||||||
, resolve
|
, resolve
|
||||||
, cacheWriter
|
, nodeCacheWriter
|
||||||
, joinOnNewEntriesThread
|
, joinOnNewEntriesThread
|
||||||
) where
|
) where
|
||||||
|
|
||||||
|
@ -283,8 +283,8 @@ joinOnNewEntriesThread nsSTM = loop
|
||||||
|
|
||||||
-- | cache updater thread that waits for incoming NodeCache update instructions on
|
-- | cache updater thread that waits for incoming NodeCache update instructions on
|
||||||
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
|
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
|
||||||
cacheWriter :: LocalNodeStateSTM -> IO ()
|
nodeCacheWriter :: LocalNodeStateSTM -> IO ()
|
||||||
cacheWriter nsSTM =
|
nodeCacheWriter nsSTM =
|
||||||
forever $ atomically $ do
|
forever $ atomically $ do
|
||||||
ns <- readTVar nsSTM
|
ns <- readTVar nsSTM
|
||||||
cacheModifier <- readTQueue $ cacheWriteQueue ns
|
cacheModifier <- readTQueue $ cacheWriteQueue ns
|
||||||
|
@ -297,8 +297,8 @@ maxEntryAge = 600
|
||||||
|
|
||||||
|
|
||||||
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
|
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones
|
||||||
cacheVerifyThread :: LocalNodeStateSTM -> IO ()
|
nodeCacheVerifyThread :: LocalNodeStateSTM -> IO ()
|
||||||
cacheVerifyThread nsSTM = forever $ do
|
nodeCacheVerifyThread nsSTM = forever $ do
|
||||||
putStrLn "cache verify run: begin"
|
putStrLn "cache verify run: begin"
|
||||||
-- get cache
|
-- get cache
|
||||||
(ns, cache) <- atomically $ do
|
(ns, cache) <- atomically $ do
|
||||||
|
@ -308,7 +308,7 @@ cacheVerifyThread nsSTM = forever $ do
|
||||||
-- iterate entries:
|
-- iterate entries:
|
||||||
-- for avoiding too many time syscalls, get current time before iterating.
|
-- for avoiding too many time syscalls, get current time before iterating.
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
forM_ (cacheEntries cache) (\(CacheEntry validated node ts) ->
|
forM_ (nodeCacheEntries cache) (\(CacheEntry validated node ts) ->
|
||||||
-- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
|
-- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
|
||||||
if (now - ts) > maxEntryAge
|
if (now - ts) > maxEntryAge
|
||||||
then
|
then
|
||||||
|
@ -541,7 +541,7 @@ fediMainThreads sock nsSTM = do
|
||||||
concurrently_
|
concurrently_
|
||||||
(fediMessageHandler sendQ recvQ nsSTM) $
|
(fediMessageHandler sendQ recvQ nsSTM) $
|
||||||
concurrently_ (stabiliseThread nsSTM) $
|
concurrently_ (stabiliseThread nsSTM) $
|
||||||
concurrently_ (cacheVerifyThread nsSTM) $
|
concurrently_ (nodeCacheVerifyThread nsSTM) $
|
||||||
concurrently_ (convergenceSampleThread nsSTM) $
|
concurrently_ (convergenceSampleThread nsSTM) $
|
||||||
concurrently_
|
concurrently_
|
||||||
(sendThread sock sendQ)
|
(sendThread sock sendQ)
|
||||||
|
|
|
@ -18,6 +18,7 @@ module Hash2Pub.FediChordTypes (
|
||||||
, setSuccessors
|
, setSuccessors
|
||||||
, setPredecessors
|
, setPredecessors
|
||||||
, NodeCache
|
, NodeCache
|
||||||
|
, NodeCacheEntry
|
||||||
, CacheEntry(..)
|
, CacheEntry(..)
|
||||||
, RingEntry(..)
|
, RingEntry(..)
|
||||||
, RingMap(..)
|
, RingMap(..)
|
||||||
|
@ -39,7 +40,7 @@ module Hash2Pub.FediChordTypes (
|
||||||
, rMapToList
|
, rMapToList
|
||||||
, cacheGetNodeStateUnvalidated
|
, cacheGetNodeStateUnvalidated
|
||||||
, initCache
|
, initCache
|
||||||
, cacheEntries
|
, nodeCacheEntries
|
||||||
, cacheLookup
|
, cacheLookup
|
||||||
, cacheLookupSucc
|
, cacheLookupSucc
|
||||||
, cacheLookupPred
|
, cacheLookupPred
|
||||||
|
@ -284,13 +285,14 @@ class (Eq a, Show a) => HasKeyID a where
|
||||||
instance HasKeyID RemoteNodeState where
|
instance HasKeyID RemoteNodeState where
|
||||||
getKeyID = getNid
|
getKeyID = getNid
|
||||||
|
|
||||||
instance HasKeyID CacheEntry where
|
instance HasKeyID a => HasKeyID (CacheEntry a) where
|
||||||
getKeyID (CacheEntry _ ns _) = getNid ns
|
getKeyID (CacheEntry _ obj _) = getKeyID obj
|
||||||
|
|
||||||
instance HasKeyID NodeID where
|
instance HasKeyID NodeID where
|
||||||
getKeyID = id
|
getKeyID = id
|
||||||
|
|
||||||
type NodeCache = RingMap CacheEntry
|
type NodeCacheEntry = CacheEntry RemoteNodeState
|
||||||
|
type NodeCache = RingMap NodeCacheEntry
|
||||||
|
|
||||||
-- | generic data structure for holding elements with a key and modular lookup
|
-- | generic data structure for holding elements with a key and modular lookup
|
||||||
newtype RingMap a = RingMap { getRingMap :: HasKeyID a => Map.Map NodeID (RingEntry a) }
|
newtype RingMap a = RingMap { getRingMap :: HasKeyID a => Map.Map NodeID (RingEntry a) }
|
||||||
|
@ -308,7 +310,7 @@ data RingEntry a = KeyEntry a
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
-- | 'RingEntry' type for usage as a node cache
|
-- | 'RingEntry' type for usage as a node cache
|
||||||
data CacheEntry = CacheEntry Bool RemoteNodeState POSIXTime
|
data CacheEntry a = CacheEntry Bool a POSIXTime
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
|
||||||
|
@ -338,8 +340,8 @@ extractRingEntry (ProxyEntry _ (Just (KeyEntry entry))) = Just entry
|
||||||
extractRingEntry _ = Nothing
|
extractRingEntry _ = Nothing
|
||||||
|
|
||||||
--- useful function for getting entries for a full cache transfer
|
--- useful function for getting entries for a full cache transfer
|
||||||
cacheEntries :: NodeCache -> [CacheEntry]
|
nodeCacheEntries :: NodeCache -> [NodeCacheEntry]
|
||||||
cacheEntries = mapMaybe extractRingEntry . Map.elems . getRingMap
|
nodeCacheEntries = mapMaybe extractRingEntry . Map.elems . getRingMap
|
||||||
|
|
||||||
-- | An empty 'RingMap' needs to be initialised with 2 proxy entries,
|
-- | An empty 'RingMap' needs to be initialised with 2 proxy entries,
|
||||||
-- linking the modular name space together by connecting @minBound@ and @maxBound@
|
-- linking the modular name space together by connecting @minBound@ and @maxBound@
|
||||||
|
@ -360,7 +362,7 @@ rMapLookup key rmap = extractRingEntry =<< Map.lookup key (getRingMap rmap)
|
||||||
|
|
||||||
cacheLookup :: NodeID -- ^lookup key
|
cacheLookup :: NodeID -- ^lookup key
|
||||||
-> NodeCache -- ^lookup cache
|
-> NodeCache -- ^lookup cache
|
||||||
-> Maybe CacheEntry
|
-> Maybe NodeCacheEntry
|
||||||
cacheLookup = rMapLookup
|
cacheLookup = rMapLookup
|
||||||
|
|
||||||
-- | returns number of present 'KeyEntry' in a properly initialised 'RingMap'
|
-- | returns number of present 'KeyEntry' in a properly initialised 'RingMap'
|
||||||
|
@ -419,7 +421,7 @@ rMapLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards
|
||||||
|
|
||||||
cacheLookupSucc :: NodeID -- ^lookup key
|
cacheLookupSucc :: NodeID -- ^lookup key
|
||||||
-> NodeCache -- ^ring cache
|
-> NodeCache -- ^ring cache
|
||||||
-> Maybe CacheEntry
|
-> Maybe NodeCacheEntry
|
||||||
cacheLookupSucc = rMapLookupSucc
|
cacheLookupSucc = rMapLookupSucc
|
||||||
|
|
||||||
-- | find the predecessor node to a given key on a modular EpiChord ring.
|
-- | find the predecessor node to a given key on a modular EpiChord ring.
|
||||||
|
@ -431,7 +433,7 @@ rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
|
||||||
|
|
||||||
cacheLookupPred :: NodeID -- ^lookup key
|
cacheLookupPred :: NodeID -- ^lookup key
|
||||||
-> NodeCache -- ^ring cache
|
-> NodeCache -- ^ring cache
|
||||||
-> Maybe CacheEntry
|
-> Maybe NodeCacheEntry
|
||||||
cacheLookupPred = rMapLookupPred
|
cacheLookupPred = rMapLookupPred
|
||||||
|
|
||||||
addRMapEntryWith :: HasKeyID a
|
addRMapEntryWith :: HasKeyID a
|
||||||
|
@ -522,7 +524,7 @@ takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc
|
||||||
-- transfer difference now - entry to other node
|
-- transfer difference now - entry to other node
|
||||||
|
|
||||||
-- | return the @NodeState@ data from a cache entry without checking its validation status
|
-- | return the @NodeState@ data from a cache entry without checking its validation status
|
||||||
cacheGetNodeStateUnvalidated :: CacheEntry -> RemoteNodeState
|
cacheGetNodeStateUnvalidated :: CacheEntry RemoteNodeState -> RemoteNodeState
|
||||||
cacheGetNodeStateUnvalidated (CacheEntry _ nState _) = nState
|
cacheGetNodeStateUnvalidated (CacheEntry _ nState _) = nState
|
||||||
|
|
||||||
-- | converts a 'HostAddress6' IP address to a big-endian strict ByteString
|
-- | converts a 'HostAddress6' IP address to a big-endian strict ByteString
|
||||||
|
|
|
@ -89,12 +89,12 @@ data RemoteCacheEntry = RemoteCacheEntry RemoteNodeState POSIXTime
|
||||||
instance Ord RemoteCacheEntry where
|
instance Ord RemoteCacheEntry where
|
||||||
(RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2
|
(RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2
|
||||||
|
|
||||||
toRemoteCacheEntry :: CacheEntry -> RemoteCacheEntry
|
toRemoteCacheEntry :: NodeCacheEntry -> RemoteCacheEntry
|
||||||
toRemoteCacheEntry (CacheEntry _ ns ts) = RemoteCacheEntry ns ts
|
toRemoteCacheEntry (CacheEntry _ ns ts) = RemoteCacheEntry ns ts
|
||||||
|
|
||||||
-- | a list of all entries of a 'NodeCache' as 'RemoteCacheEntry', useful for cache transfers
|
-- | a list of all entries of a 'NodeCache' as 'RemoteCacheEntry', useful for cache transfers
|
||||||
toRemoteCache :: NodeCache -> [RemoteCacheEntry]
|
toRemoteCache :: NodeCache -> [RemoteCacheEntry]
|
||||||
toRemoteCache cache = toRemoteCacheEntry <$> cacheEntries cache
|
toRemoteCache cache = toRemoteCacheEntry <$> nodeCacheEntries cache
|
||||||
|
|
||||||
-- | extract the 'NodeState' from a 'RemoteCacheEntry'
|
-- | extract the 'NodeState' from a 'RemoteCacheEntry'
|
||||||
remoteNode :: RemoteCacheEntry -> RemoteNodeState
|
remoteNode :: RemoteCacheEntry -> RemoteNodeState
|
||||||
|
|
Loading…
Reference in a new issue