parent
c9783a10cf
commit
5e8cfb0ccd
|
@ -3,9 +3,10 @@ module Hash2Pub.DHTProtocol
|
|||
, queryLocalCache
|
||||
, addCacheEntry
|
||||
, addCacheEntryPure
|
||||
, addNodeAsVerified
|
||||
, addNodeAsVerifiedPure
|
||||
, deleteCacheEntry
|
||||
, deserialiseMessage
|
||||
, markCacheEntryAsVerified
|
||||
, RemoteCacheEntry(..)
|
||||
, toRemoteCacheEntry
|
||||
, remoteNode
|
||||
|
@ -64,9 +65,10 @@ import Hash2Pub.FediChordTypes (CacheEntry (..),
|
|||
addRMapEntry, addRMapEntryWith,
|
||||
cacheGetNodeStateUnvalidated,
|
||||
cacheLookup, cacheLookupPred,
|
||||
cacheLookupSucc, getKeyID,
|
||||
localCompare, rMapFromList,
|
||||
rMapLookupPred, rMapLookupSucc,
|
||||
cacheLookupSucc, genNodeID,
|
||||
getKeyID, localCompare,
|
||||
rMapFromList, rMapLookupPred,
|
||||
rMapLookupSucc,
|
||||
setPredecessors, setSuccessors)
|
||||
import Hash2Pub.ProtocolTypes
|
||||
|
||||
|
@ -154,6 +156,26 @@ deleteCacheEntry nid = RingMap . Map.update modifier nid . getRingMap
|
|||
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
|
||||
modifier KeyEntry {} = Nothing
|
||||
|
||||
|
||||
-- | Add a 'RemoteNodeState' to the node cache marked as verified.
|
||||
-- If an entry already exists, it is replaced by the new verified one.
|
||||
addNodeAsVerified :: RemoteNodeState
|
||||
-> NodeCache
|
||||
-> IO NodeCache
|
||||
addNodeAsVerified node cache = do
|
||||
now <- getPOSIXTime
|
||||
pure $ addNodeAsVerifiedPure now node cache
|
||||
|
||||
|
||||
-- | Pure variant of 'addNodeAsVerified' with current time explicitly specified as an argument
|
||||
addNodeAsVerifiedPure :: POSIXTime
|
||||
-> RemoteNodeState
|
||||
-> NodeCache
|
||||
-> NodeCache
|
||||
addNodeAsVerifiedPure now node = addRMapEntry (CacheEntry True node now)
|
||||
|
||||
|
||||
|
||||
-- | Mark a cache entry as verified after pinging it, possibly bumping its timestamp.
|
||||
markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to be
|
||||
-- given to the entry, or Nothing
|
||||
|
@ -540,26 +562,38 @@ requestPing :: LocalNodeState -- ^ sending node
|
|||
-> RemoteNodeState -- ^ node to be PINGed
|
||||
-> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node
|
||||
requestPing ns target = do
|
||||
responses <- bracket (mkSendSocket (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid ->
|
||||
Request {
|
||||
requestID = rid
|
||||
, sender = toRemoteNodeState ns
|
||||
, part = 1
|
||||
, isFinalPart = False
|
||||
, action = Ping
|
||||
, payload = Just PingRequestPayload
|
||||
}
|
||||
)
|
||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
responses <- bracket (mkSendSocket (getDomain target) (getDhtPort target)) close
|
||||
(\sock -> do
|
||||
resp <- sendRequestTo 5000 3 (\rid ->
|
||||
Request {
|
||||
requestID = rid
|
||||
, sender = toRemoteNodeState ns
|
||||
, part = 1
|
||||
, isFinalPart = False
|
||||
, action = Ping
|
||||
, payload = Just PingRequestPayload
|
||||
}
|
||||
) sock
|
||||
(SockAddrInet6 _ _ peerAddr _) <- getPeerName sock
|
||||
pure $ Right (peerAddr, resp)
|
||||
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
either
|
||||
-- forward IO error messages
|
||||
(pure . Left)
|
||||
(\respSet -> do
|
||||
(\(peerAddr, respSet) -> do
|
||||
-- fold all reply parts together
|
||||
let responseVss = foldr' (\msg acc ->
|
||||
maybe acc (foldr' (:) acc) (pingNodeStates <$> payload msg)
|
||||
)
|
||||
[] respSet
|
||||
-- recompute ID for each received node and mark as verified in cache
|
||||
now <- getPOSIXTime
|
||||
forM_ responseVss (\vs ->
|
||||
let recomputedID = genNodeID peerAddr (getDomain vs) (fromInteger $ getVServerID vs)
|
||||
in if recomputedID == getNid vs
|
||||
then atomically $ writeTQueue (cacheWriteQueue ns) $ addNodeAsVerifiedPure now vs
|
||||
else pure ()
|
||||
)
|
||||
pure $ if null responseVss
|
||||
then Left "no active vServer IDs returned, ignoring node"
|
||||
else Right responseVss
|
||||
|
|
Loading…
Reference in a new issue