Compare commits

..

No commits in common. "c37fe88b354644f4b497623d27a2b3e8cfa2f5f8" and "8ade04e48dd3addc276a8fdbde27ecfe15d6a3d7" have entirely different histories.

4 changed files with 21 additions and 30 deletions

View file

@ -20,7 +20,7 @@ main = do
-- ToDo: load persisted caches, bootstrapping nodes … -- ToDo: load persisted caches, bootstrapping nodes …
(serverSock, thisNode) <- fediChordInit conf (serverSock, thisNode) <- fediChordInit conf
-- currently no masking is necessary, as there is nothing to clean up -- currently no masking is necessary, as there is nothing to clean up
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode cacheWriterThread <- forkIO $ cacheWriter thisNode
-- try joining the DHT using one of the provided bootstrapping nodes -- try joining the DHT using one of the provided bootstrapping nodes
joinedState <- tryBootstrapJoining thisNode joinedState <- tryBootstrapJoining thisNode
either (\err -> do either (\err -> do

View file

@ -47,7 +47,7 @@ module Hash2Pub.FediChord (
, mkServerSocket , mkServerSocket
, mkSendSocket , mkSendSocket
, resolve , resolve
, nodeCacheWriter , cacheWriter
, joinOnNewEntriesThread , joinOnNewEntriesThread
) where ) where
@ -94,12 +94,10 @@ import Debug.Trace (trace)
-- ToDo: load persisted state, thus this function already operates in IO -- ToDo: load persisted state, thus this function already operates in IO
fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM) fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM)
fediChordInit initConf = do fediChordInit initConf = do
emptyLookupCache <- newTVarIO Map.empty
let realNode = RealNode { let realNode = RealNode {
vservers = [] vservers = []
, nodeConfig = initConf , nodeConfig = initConf
, bootstrapNodes = confBootstrapNodes initConf , bootstrapNodes = confBootstrapNodes initConf
, lookupCacheSTM = emptyLookupCache
} }
realNodeSTM <- newTVarIO realNode realNodeSTM <- newTVarIO realNode
initialState <- nodeStateInit realNodeSTM initialState <- nodeStateInit realNodeSTM
@ -285,8 +283,8 @@ joinOnNewEntriesThread nsSTM = loop
-- | cache updater thread that waits for incoming NodeCache update instructions on -- | cache updater thread that waits for incoming NodeCache update instructions on
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. -- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
nodeCacheWriter :: LocalNodeStateSTM -> IO () cacheWriter :: LocalNodeStateSTM -> IO ()
nodeCacheWriter nsSTM = cacheWriter nsSTM =
forever $ atomically $ do forever $ atomically $ do
ns <- readTVar nsSTM ns <- readTVar nsSTM
cacheModifier <- readTQueue $ cacheWriteQueue ns cacheModifier <- readTQueue $ cacheWriteQueue ns
@ -299,8 +297,8 @@ maxEntryAge = 600
-- | Periodically iterate through cache, clean up expired entries and verify unverified ones -- | Periodically iterate through cache, clean up expired entries and verify unverified ones
nodeCacheVerifyThread :: LocalNodeStateSTM -> IO () cacheVerifyThread :: LocalNodeStateSTM -> IO ()
nodeCacheVerifyThread nsSTM = forever $ do cacheVerifyThread nsSTM = forever $ do
putStrLn "cache verify run: begin" putStrLn "cache verify run: begin"
-- get cache -- get cache
(ns, cache) <- atomically $ do (ns, cache) <- atomically $ do
@ -310,7 +308,7 @@ nodeCacheVerifyThread nsSTM = forever $ do
-- iterate entries: -- iterate entries:
-- for avoiding too many time syscalls, get current time before iterating. -- for avoiding too many time syscalls, get current time before iterating.
now <- getPOSIXTime now <- getPOSIXTime
forM_ (nodeCacheEntries cache) (\(CacheEntry validated node ts) -> forM_ (cacheEntries cache) (\(CacheEntry validated node ts) ->
-- case too old: delete (future work: decide whether pinging and resetting timestamp is better) -- case too old: delete (future work: decide whether pinging and resetting timestamp is better)
if (now - ts) > maxEntryAge if (now - ts) > maxEntryAge
then then
@ -543,7 +541,7 @@ fediMainThreads sock nsSTM = do
concurrently_ concurrently_
(fediMessageHandler sendQ recvQ nsSTM) $ (fediMessageHandler sendQ recvQ nsSTM) $
concurrently_ (stabiliseThread nsSTM) $ concurrently_ (stabiliseThread nsSTM) $
concurrently_ (nodeCacheVerifyThread nsSTM) $ concurrently_ (cacheVerifyThread nsSTM) $
concurrently_ (convergenceSampleThread nsSTM) $ concurrently_ (convergenceSampleThread nsSTM) $
concurrently_ concurrently_
(sendThread sock sendQ) (sendThread sock sendQ)

View file

@ -18,7 +18,6 @@ module Hash2Pub.FediChordTypes (
, setSuccessors , setSuccessors
, setPredecessors , setPredecessors
, NodeCache , NodeCache
, NodeCacheEntry
, CacheEntry(..) , CacheEntry(..)
, RingEntry(..) , RingEntry(..)
, RingMap(..) , RingMap(..)
@ -40,7 +39,7 @@ module Hash2Pub.FediChordTypes (
, rMapToList , rMapToList
, cacheGetNodeStateUnvalidated , cacheGetNodeStateUnvalidated
, initCache , initCache
, nodeCacheEntries , cacheEntries
, cacheLookup , cacheLookup
, cacheLookupSucc , cacheLookupSucc
, cacheLookupPred , cacheLookupPred
@ -145,8 +144,6 @@ data RealNode = RealNode
-- ^ holds the initial configuration read at program start -- ^ holds the initial configuration read at program start
, bootstrapNodes :: [(String, PortNumber)] , bootstrapNodes :: [(String, PortNumber)]
-- ^ nodes to be used as bootstrapping points, new ones learned during operation -- ^ nodes to be used as bootstrapping points, new ones learned during operation
, lookupCacheSTM :: TVar LookupCache
-- ^ a global cache of looked up keys and their associated nodes
} }
type RealNodeSTM = TVar RealNode type RealNodeSTM = TVar RealNode
@ -287,17 +284,13 @@ class (Eq a, Show a) => HasKeyID a where
instance HasKeyID RemoteNodeState where instance HasKeyID RemoteNodeState where
getKeyID = getNid getKeyID = getNid
instance HasKeyID a => HasKeyID (CacheEntry a) where instance HasKeyID CacheEntry where
getKeyID (CacheEntry _ obj _) = getKeyID obj getKeyID (CacheEntry _ ns _) = getNid ns
instance HasKeyID NodeID where instance HasKeyID NodeID where
getKeyID = id getKeyID = id
type NodeCacheEntry = CacheEntry RemoteNodeState type NodeCache = RingMap CacheEntry
type NodeCache = RingMap NodeCacheEntry
type LookupCacheEntry = CacheEntry (String, PortNumber)
type LookupCache = Map.Map NodeID LookupCacheEntry
-- | generic data structure for holding elements with a key and modular lookup -- | generic data structure for holding elements with a key and modular lookup
newtype RingMap a = RingMap { getRingMap :: HasKeyID a => Map.Map NodeID (RingEntry a) } newtype RingMap a = RingMap { getRingMap :: HasKeyID a => Map.Map NodeID (RingEntry a) }
@ -315,7 +308,7 @@ data RingEntry a = KeyEntry a
deriving (Show, Eq) deriving (Show, Eq)
-- | 'RingEntry' type for usage as a node cache -- | 'RingEntry' type for usage as a node cache
data CacheEntry a = CacheEntry Bool a POSIXTime data CacheEntry = CacheEntry Bool RemoteNodeState POSIXTime
deriving (Show, Eq) deriving (Show, Eq)
@ -345,8 +338,8 @@ extractRingEntry (ProxyEntry _ (Just (KeyEntry entry))) = Just entry
extractRingEntry _ = Nothing extractRingEntry _ = Nothing
--- useful function for getting entries for a full cache transfer --- useful function for getting entries for a full cache transfer
nodeCacheEntries :: NodeCache -> [NodeCacheEntry] cacheEntries :: NodeCache -> [CacheEntry]
nodeCacheEntries = mapMaybe extractRingEntry . Map.elems . getRingMap cacheEntries = mapMaybe extractRingEntry . Map.elems . getRingMap
-- | An empty 'RingMap' needs to be initialised with 2 proxy entries, -- | An empty 'RingMap' needs to be initialised with 2 proxy entries,
-- linking the modular name space together by connecting @minBound@ and @maxBound@ -- linking the modular name space together by connecting @minBound@ and @maxBound@
@ -367,7 +360,7 @@ rMapLookup key rmap = extractRingEntry =<< Map.lookup key (getRingMap rmap)
cacheLookup :: NodeID -- ^lookup key cacheLookup :: NodeID -- ^lookup key
-> NodeCache -- ^lookup cache -> NodeCache -- ^lookup cache
-> Maybe NodeCacheEntry -> Maybe CacheEntry
cacheLookup = rMapLookup cacheLookup = rMapLookup
-- | returns number of present 'KeyEntry' in a properly initialised 'RingMap' -- | returns number of present 'KeyEntry' in a properly initialised 'RingMap'
@ -426,7 +419,7 @@ rMapLookupSucc = lookupWrapper Map.lookupGE Map.lookupGE Forwards
cacheLookupSucc :: NodeID -- ^lookup key cacheLookupSucc :: NodeID -- ^lookup key
-> NodeCache -- ^ring cache -> NodeCache -- ^ring cache
-> Maybe NodeCacheEntry -> Maybe CacheEntry
cacheLookupSucc = rMapLookupSucc cacheLookupSucc = rMapLookupSucc
-- | find the predecessor node to a given key on a modular EpiChord ring. -- | find the predecessor node to a given key on a modular EpiChord ring.
@ -438,7 +431,7 @@ rMapLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
cacheLookupPred :: NodeID -- ^lookup key cacheLookupPred :: NodeID -- ^lookup key
-> NodeCache -- ^ring cache -> NodeCache -- ^ring cache
-> Maybe NodeCacheEntry -> Maybe CacheEntry
cacheLookupPred = rMapLookupPred cacheLookupPred = rMapLookupPred
addRMapEntryWith :: HasKeyID a addRMapEntryWith :: HasKeyID a
@ -529,7 +522,7 @@ takeRMapSuccessors = takeRMapEntries_ rMapLookupSucc
-- transfer difference now - entry to other node -- transfer difference now - entry to other node
-- | return the @NodeState@ data from a cache entry without checking its validation status -- | return the @NodeState@ data from a cache entry without checking its validation status
cacheGetNodeStateUnvalidated :: CacheEntry RemoteNodeState -> RemoteNodeState cacheGetNodeStateUnvalidated :: CacheEntry -> RemoteNodeState
cacheGetNodeStateUnvalidated (CacheEntry _ nState _) = nState cacheGetNodeStateUnvalidated (CacheEntry _ nState _) = nState
-- | converts a 'HostAddress6' IP address to a big-endian strict ByteString -- | converts a 'HostAddress6' IP address to a big-endian strict ByteString

View file

@ -89,12 +89,12 @@ data RemoteCacheEntry = RemoteCacheEntry RemoteNodeState POSIXTime
instance Ord RemoteCacheEntry where instance Ord RemoteCacheEntry where
(RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2 (RemoteCacheEntry ns1 _) `compare` (RemoteCacheEntry ns2 _) = nid ns1 `compare` nid ns2
toRemoteCacheEntry :: NodeCacheEntry -> RemoteCacheEntry toRemoteCacheEntry :: CacheEntry -> RemoteCacheEntry
toRemoteCacheEntry (CacheEntry _ ns ts) = RemoteCacheEntry ns ts toRemoteCacheEntry (CacheEntry _ ns ts) = RemoteCacheEntry ns ts
-- | a list of all entries of a 'NodeCache' as 'RemoteCacheEntry', useful for cache transfers -- | a list of all entries of a 'NodeCache' as 'RemoteCacheEntry', useful for cache transfers
toRemoteCache :: NodeCache -> [RemoteCacheEntry] toRemoteCache :: NodeCache -> [RemoteCacheEntry]
toRemoteCache cache = toRemoteCacheEntry <$> nodeCacheEntries cache toRemoteCache cache = toRemoteCacheEntry <$> cacheEntries cache
-- | extract the 'NodeState' from a 'RemoteCacheEntry' -- | extract the 'NodeState' from a 'RemoteCacheEntry'
remoteNode :: RemoteCacheEntry -> RemoteNodeState remoteNode :: RemoteCacheEntry -> RemoteNodeState