forked from schmittlauch/Hash2Pub
sending a queryID request compiles (untested)
This commit is contained in:
parent
beffab99a0
commit
f6c252d314
6 changed files with 83 additions and 41 deletions
|
@ -7,7 +7,7 @@ module Hash2Pub.DHTProtocol
|
|||
, markCacheEntryAsVerified
|
||||
, RemoteCacheEntry(..)
|
||||
, toRemoteCacheEntry
|
||||
, remoteNode_
|
||||
, remoteNode
|
||||
, Action(..)
|
||||
, ActionPayload(..)
|
||||
, FediChordMessage(..)
|
||||
|
@ -15,17 +15,25 @@ module Hash2Pub.DHTProtocol
|
|||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.STM.TBQueue
|
||||
import Control.Concurrent.STM.TQueue
|
||||
import Control.Monad (foldM, forM, forM_)
|
||||
import qualified Data.ByteString as BS
|
||||
import Data.Either (rights)
|
||||
import Data.Foldable (foldl')
|
||||
import Data.IORef
|
||||
import qualified Data.Map as Map
|
||||
import Data.Maybe (fromMaybe, maybe)
|
||||
import Data.Maybe (fromJust, fromMaybe, mapMaybe,
|
||||
maybe)
|
||||
import qualified Data.Set as Set
|
||||
import Data.Time.Clock.POSIX
|
||||
import Network.Socket hiding (recv, recvFrom, send,
|
||||
sendTo)
|
||||
import Network.Socket.ByteString
|
||||
import Safe
|
||||
import System.Random
|
||||
import System.Timeout
|
||||
|
||||
import Hash2Pub.ASN1Coding
|
||||
|
@ -34,9 +42,13 @@ import Hash2Pub.FediChord (CacheEntry (..), NodeCache,
|
|||
cacheGetNodeStateUnvalidated,
|
||||
cacheLookup, cacheLookupPred,
|
||||
cacheLookupSucc,
|
||||
getCacheWriteQueue,
|
||||
getLNumBestNodes,
|
||||
getNodeCacheRef,
|
||||
getPredecessors, getSuccessors,
|
||||
localCompare, putPredecessors,
|
||||
putSuccessors)
|
||||
localCompare, mkSendSocket,
|
||||
mkServerSocket,
|
||||
putPredecessors, putSuccessors)
|
||||
import Hash2Pub.ProtocolTypes
|
||||
|
||||
import Debug.Trace (trace)
|
||||
|
@ -123,36 +135,71 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
|
|||
|
||||
-- ====== message send and receive operations ======
|
||||
|
||||
--requestQueryID :: NodeState -> NodeID -> IO NodeState
|
||||
---- 1. do a local lookup for the l closest nodes
|
||||
---- 2. create l sockets
|
||||
---- 3. send a message async concurrently to all l nodes
|
||||
---- 4. collect the results, insert them into cache
|
||||
---- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
|
||||
--requestQueryID ns targetID = do
|
||||
-- cacheSnapshot <- readIORef $ getNodeCacheRef ns
|
||||
-- let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
|
||||
-- -- FOUND can only be returned if targetID is owned by local node
|
||||
-- case localResult of
|
||||
-- FOUND thisNode -> return thisNode
|
||||
-- FORWARD nodeSet ->
|
||||
-- sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
|
||||
-- -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||
-- responses = mapM
|
||||
requestQueryID :: NodeState -> NodeID -> IO NodeState
|
||||
-- 1. do a local lookup for the l closest nodes
|
||||
-- 2. create l sockets
|
||||
-- 3. send a message async concurrently to all l nodes
|
||||
-- 4. collect the results, insert them into cache
|
||||
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
|
||||
requestQueryID ns targetID = do
|
||||
firstCacheSnapshot <- readIORef $ fromJust . getNodeCacheRef $ ns
|
||||
lookupLoop firstCacheSnapshot
|
||||
where
|
||||
lookupLoop :: NodeCache -> IO NodeState
|
||||
lookupLoop cacheSnapshot = do
|
||||
let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
|
||||
-- FOUND can only be returned if targetID is owned by local node
|
||||
case localResult of
|
||||
FOUND thisNode -> pure thisNode
|
||||
FORWARD nodeSet -> do
|
||||
-- create connected sockets to all query targets
|
||||
sockets <- mapM (\resultNode -> mkSendSocket (domain resultNode) (dhtPort resultNode)) $ remoteNode <$> Set.toList nodeSet
|
||||
-- ToDo: make attempts and timeout configurable
|
||||
queryThreads <- mapM (async . sendRequestTo 5000 3 (lookupMessage targetID)) sockets
|
||||
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||
-- ToDo: exception handling, maybe log them
|
||||
responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads
|
||||
-- insert new cache entries both into global cache as well as in local copy, to make sure it is already up to date at next lookup
|
||||
now <- getPOSIXTime
|
||||
newLCache <- foldM (\oldCache resp -> do
|
||||
let entriesToInsert = case queryResult <$> payload resp of
|
||||
Just (FOUND result1) -> [addCacheEntryPure now (RemoteCacheEntry result1 now)]
|
||||
Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset
|
||||
_ -> []
|
||||
-- forward entries to global cache
|
||||
forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (fromJust . getCacheWriteQueue $ ns) entry
|
||||
-- insert entries into local cache copy
|
||||
pure $ foldl' (
|
||||
\oldLCache insertFunc -> insertFunc oldLCache
|
||||
) oldCache entriesToInsert
|
||||
) cacheSnapshot responses
|
||||
|
||||
-- check for a FOUND and return it
|
||||
let foundResp = headMay . mapMaybe (\resp -> case queryResult <$> payload resp of
|
||||
Just (FOUND ns') -> Just ns'
|
||||
_ -> Nothing
|
||||
) $ responses
|
||||
-- if no FOUND, recursively call lookup again
|
||||
maybe (lookupLoop newLCache) pure foundResp
|
||||
|
||||
-- todo: random request ID
|
||||
lookupMessage targetID rID = Request rID ns 1 1 QueryID (Just $ pl ns targetID)
|
||||
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . fromJust . getLNumBestNodes $ ns }
|
||||
|
||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
||||
sendRequestTo :: Int -- ^ timeout in seconds
|
||||
-> Int -- ^ number of retries
|
||||
-> FediChordMessage -- ^ the message to be sent
|
||||
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
|
||||
-> Socket -- ^ connected socket to use for sending
|
||||
-> IO (Set.Set FediChordMessage) -- ^ responses
|
||||
sendRequestTo timeoutMillis numAttempts msg sock = do
|
||||
let requests = serialiseMessage 1200 msg
|
||||
sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
||||
-- give the message a random request ID
|
||||
randomID <- randomRIO (0, 2^32-1)
|
||||
let requests = serialiseMessage 1200 $ msgIncomplete randomID
|
||||
-- create a queue for passing received response messages back, even after a timeout
|
||||
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
|
||||
-- start sendAndAck with timeout
|
||||
-- ToDo: make attempts and timeout configurable
|
||||
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
|
||||
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
|
||||
recvdParts <- atomically $ flushTBQueue responseQ
|
||||
|
@ -190,13 +237,6 @@ sendRequestTo timeoutMillis numAttempts msg sock = do
|
|||
else recvLoop responseQueue newRemaining receivedPartNums
|
||||
|
||||
|
||||
|
||||
|
||||
-- idea: send all parts at once
|
||||
-- Set/ Map with unacked parts
|
||||
-- then recv with timeout for |unackedParts| attempts, receive acked parts from set/ map
|
||||
-- how to manage individual retries? nested "attempts"
|
||||
|
||||
-- | retry an IO action at most *i* times until it delivers a result
|
||||
attempts :: Int -- ^ number of retries *i*
|
||||
-> IO (Maybe a) -- ^ action to retry
|
||||
|
|
|
@ -24,6 +24,7 @@ module Hash2Pub.FediChord (
|
|||
, getPredecessors
|
||||
, putPredecessors
|
||||
, getLNumBestNodes
|
||||
, getCacheWriteQueue
|
||||
, NodeCache
|
||||
, CacheEntry(..)
|
||||
, cacheGetNodeStateUnvalidated
|
||||
|
@ -43,6 +44,7 @@ module Hash2Pub.FediChord (
|
|||
, fediChordInit
|
||||
, nodeStateInit
|
||||
, mkServerSocket
|
||||
, mkSendSocket
|
||||
, resolve
|
||||
, cacheWriter
|
||||
) where
|
||||
|
|
|
@ -84,6 +84,6 @@ toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts
|
|||
toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry
|
||||
toRemoteCacheEntry _ = Nothing
|
||||
|
||||
-- helper function for use in tests
|
||||
remoteNode_ :: RemoteCacheEntry -> NodeState
|
||||
remoteNode_ (RemoteCacheEntry ns _) = ns
|
||||
-- | extract the 'NodeState' from a 'RemoteCacheEntry'
|
||||
remoteNode :: RemoteCacheEntry -> NodeState
|
||||
remoteNode (RemoteCacheEntry ns _) = ns
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue