From f6c252d3144e8666a3214364312b6ec0b34e4085 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 20 May 2020 18:37:56 +0200 Subject: [PATCH] sending a queryID request compiles (untested) --- FediChord.asn1 | 4 +- Hash2Pub.cabal | 2 +- src/Hash2Pub/DHTProtocol.hs | 102 +++++++++++++++++++++++----------- src/Hash2Pub/FediChord.hs | 2 + src/Hash2Pub/ProtocolTypes.hs | 6 +- test/FediChordSpec.hs | 8 +-- 6 files changed, 83 insertions(+), 41 deletions(-) diff --git a/FediChord.asn1 b/FediChord.asn1 index dda8bdc..b80b15a 100644 --- a/FediChord.asn1 +++ b/FediChord.asn1 @@ -8,7 +8,7 @@ Action ::= ENUMERATED {queryID, join, leave, stabilise, ping} Request ::= SEQUENCE { action Action, - requestID INTEGER, + requestID INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer sender NodeState, parts INTEGER (1..150), -- number of message parts part INTEGER (1..150), -- part number of this message, starts at 1 @@ -25,7 +25,7 @@ Request ::= SEQUENCE { -- request and response instead of explicit flag Response ::= SEQUENCE { - responseTo INTEGER, + responseTo INTEGER (0..4294967295), -- arbitrarily restricting to an unsigned 32bit integer senderID NodeID, parts INTEGER (0..150), part INTEGER (0..150), diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index bf4d856..f1533f4 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -46,7 +46,7 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random ghc-options: -Wall diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index ecc701c..d594b1f 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -7,7 +7,7 @@ module Hash2Pub.DHTProtocol , markCacheEntryAsVerified , RemoteCacheEntry(..) , toRemoteCacheEntry - , remoteNode_ + , remoteNode , Action(..) , ActionPayload(..) , FediChordMessage(..) @@ -15,17 +15,25 @@ module Hash2Pub.DHTProtocol ) where +import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TBQueue import Control.Concurrent.STM.TQueue +import Control.Monad (foldM, forM, forM_) import qualified Data.ByteString as BS +import Data.Either (rights) +import Data.Foldable (foldl') +import Data.IORef import qualified Data.Map as Map -import Data.Maybe (fromMaybe, maybe) +import Data.Maybe (fromJust, fromMaybe, mapMaybe, + maybe) import qualified Data.Set as Set import Data.Time.Clock.POSIX import Network.Socket hiding (recv, recvFrom, send, sendTo) import Network.Socket.ByteString +import Safe +import System.Random import System.Timeout import Hash2Pub.ASN1Coding @@ -34,9 +42,13 @@ import Hash2Pub.FediChord (CacheEntry (..), NodeCache, cacheGetNodeStateUnvalidated, cacheLookup, cacheLookupPred, cacheLookupSucc, + getCacheWriteQueue, + getLNumBestNodes, + getNodeCacheRef, getPredecessors, getSuccessors, - localCompare, putPredecessors, - putSuccessors) + localCompare, mkSendSocket, + mkServerSocket, + putPredecessors, putSuccessors) import Hash2Pub.ProtocolTypes import Debug.Trace (trace) @@ -123,36 +135,71 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc -- ====== message send and receive operations ====== ---requestQueryID :: NodeState -> NodeID -> IO NodeState ----- 1. do a local lookup for the l closest nodes ----- 2. create l sockets ----- 3. send a message async concurrently to all l nodes ----- 4. collect the results, insert them into cache ----- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) ---requestQueryID ns targetID = do --- cacheSnapshot <- readIORef $ getNodeCacheRef ns --- let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID --- -- FOUND can only be returned if targetID is owned by local node --- case localResult of --- FOUND thisNode -> return thisNode --- FORWARD nodeSet -> --- sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet --- -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 --- responses = mapM +requestQueryID :: NodeState -> NodeID -> IO NodeState +-- 1. do a local lookup for the l closest nodes +-- 2. create l sockets +-- 3. send a message async concurrently to all l nodes +-- 4. collect the results, insert them into cache +-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) +requestQueryID ns targetID = do + firstCacheSnapshot <- readIORef $ fromJust . getNodeCacheRef $ ns + lookupLoop firstCacheSnapshot + where + lookupLoop :: NodeCache -> IO NodeState + lookupLoop cacheSnapshot = do + let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID + -- FOUND can only be returned if targetID is owned by local node + case localResult of + FOUND thisNode -> pure thisNode + FORWARD nodeSet -> do + -- create connected sockets to all query targets + sockets <- mapM (\resultNode -> mkSendSocket (domain resultNode) (dhtPort resultNode)) $ remoteNode <$> Set.toList nodeSet + -- ToDo: make attempts and timeout configurable + queryThreads <- mapM (async . sendRequestTo 5000 3 (lookupMessage targetID)) sockets + -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 + -- ToDo: exception handling, maybe log them + responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads + -- insert new cache entries both into global cache as well as in local copy, to make sure it is already up to date at next lookup + now <- getPOSIXTime + newLCache <- foldM (\oldCache resp -> do + let entriesToInsert = case queryResult <$> payload resp of + Just (FOUND result1) -> [addCacheEntryPure now (RemoteCacheEntry result1 now)] + Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset + _ -> [] + -- forward entries to global cache + forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (fromJust . getCacheWriteQueue $ ns) entry + -- insert entries into local cache copy + pure $ foldl' ( + \oldLCache insertFunc -> insertFunc oldLCache + ) oldCache entriesToInsert + ) cacheSnapshot responses + + -- check for a FOUND and return it + let foundResp = headMay . mapMaybe (\resp -> case queryResult <$> payload resp of + Just (FOUND ns') -> Just ns' + _ -> Nothing + ) $ responses + -- if no FOUND, recursively call lookup again + maybe (lookupLoop newLCache) pure foundResp + + -- todo: random request ID + lookupMessage targetID rID = Request rID ns 1 1 QueryID (Just $ pl ns targetID) + pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . fromJust . getLNumBestNodes $ ns } -- | Generic function for sending a request over a connected socket and collecting the response. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. sendRequestTo :: Int -- ^ timeout in seconds -> Int -- ^ number of retries - -> FediChordMessage -- ^ the message to be sent + -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID -> Socket -- ^ connected socket to use for sending -> IO (Set.Set FediChordMessage) -- ^ responses -sendRequestTo timeoutMillis numAttempts msg sock = do - let requests = serialiseMessage 1200 msg +sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do + -- give the message a random request ID + randomID <- randomRIO (0, 2^32-1) + let requests = serialiseMessage 1200 $ msgIncomplete randomID -- create a queue for passing received response messages back, even after a timeout responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets -- start sendAndAck with timeout - -- ToDo: make attempts and timeout configurable attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. recvdParts <- atomically $ flushTBQueue responseQ @@ -190,13 +237,6 @@ sendRequestTo timeoutMillis numAttempts msg sock = do else recvLoop responseQueue newRemaining receivedPartNums - - --- idea: send all parts at once --- Set/ Map with unacked parts --- then recv with timeout for |unackedParts| attempts, receive acked parts from set/ map --- how to manage individual retries? nested "attempts" - -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 479d90d..ca87945 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -24,6 +24,7 @@ module Hash2Pub.FediChord ( , getPredecessors , putPredecessors , getLNumBestNodes + , getCacheWriteQueue , NodeCache , CacheEntry(..) , cacheGetNodeStateUnvalidated @@ -43,6 +44,7 @@ module Hash2Pub.FediChord ( , fediChordInit , nodeStateInit , mkServerSocket + , mkSendSocket , resolve , cacheWriter ) where diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs index c7453ab..936832d 100644 --- a/src/Hash2Pub/ProtocolTypes.hs +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -84,6 +84,6 @@ toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry toRemoteCacheEntry _ = Nothing --- helper function for use in tests -remoteNode_ :: RemoteCacheEntry -> NodeState -remoteNode_ (RemoteCacheEntry ns _) = ns +-- | extract the 'NodeState' from a 'RemoteCacheEntry' +remoteNode :: RemoteCacheEntry -> NodeState +remoteNode (RemoteCacheEntry ns _) = ns diff --git a/test/FediChordSpec.hs b/test/FediChordSpec.hs index 50f0d66..7889c75 100644 --- a/test/FediChordSpec.hs +++ b/test/FediChordSpec.hs @@ -138,12 +138,12 @@ spec = do queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 2342) `shouldReturn` FORWARD Set.empty it "works on a cache with less entries than needed" $ do (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5) - Set.map (nid . remoteNode_) nodeset `shouldBe` Set.fromList [ nid1, nid2 ] + Set.map (nid . remoteNode) nodeset `shouldBe` Set.fromList [ nid1, nid2 ] it "works on a cache with sufficient entries" $ do (FORWARD nodeset1) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) (FORWARD nodeset2) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5) - Set.map (nid . remoteNode_) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3] - Set.map (nid . remoteNode_) nodeset2 `shouldBe` Set.fromList [nid4] + Set.map (nid . remoteNode) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3] + Set.map (nid . remoteNode) nodeset2 `shouldBe` Set.fromList [nid4] it "recognises the node's own responsibility" $ do FOUND selfQueryRes <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure nid1 nid <$> node1 `shouldReturn` nid selfQueryRes @@ -151,7 +151,7 @@ spec = do nid <$> node1 `shouldReturn` nid responsibilityResult it "does not fail on nodes without neighbours (initial state)" $ do (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 11) - Set.map (nid . remoteNode_ ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3] + Set.map (nid . remoteNode ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3] describe "Messages can be encoded to and decoded from ASN.1" $ do -- define test messages