Compare commits
	
		
			2 commits
		
	
	
		
			beffab99a0
			...
			99a2b0ba09
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 99a2b0ba09 | |||
| f6c252d314 | 
					 6 changed files with 85 additions and 42 deletions
				
			
		| 
						 | 
				
			
			@ -8,7 +8,7 @@ Action ::= ENUMERATED {queryID, join, leave, stabilise, ping}
 | 
			
		|||
 | 
			
		||||
Request ::= SEQUENCE {
 | 
			
		||||
	action			Action,
 | 
			
		||||
	requestID		INTEGER,
 | 
			
		||||
	requestID		INTEGER (0..4294967295),	-- arbitrarily restricting to an unsigned 32bit integer
 | 
			
		||||
	sender			NodeState,
 | 
			
		||||
	parts			INTEGER (1..150),	-- number of message parts
 | 
			
		||||
	part			INTEGER (1..150),	-- part number of this message, starts at 1
 | 
			
		||||
| 
						 | 
				
			
			@ -25,7 +25,7 @@ Request ::= SEQUENCE {
 | 
			
		|||
-- request and response instead of explicit flag
 | 
			
		||||
 | 
			
		||||
Response ::= SEQUENCE {
 | 
			
		||||
	responseTo		INTEGER,
 | 
			
		||||
	responseTo		INTEGER (0..4294967295),	-- arbitrarily restricting to an unsigned 32bit integer
 | 
			
		||||
	senderID		NodeID,
 | 
			
		||||
	parts			INTEGER (0..150),
 | 
			
		||||
	part			INTEGER (0..150),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -46,7 +46,7 @@ category:            Network
 | 
			
		|||
extra-source-files:  CHANGELOG.md
 | 
			
		||||
 | 
			
		||||
common deps
 | 
			
		||||
  build-depends:       base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl
 | 
			
		||||
  build-depends:       base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random
 | 
			
		||||
  ghc-options:         -Wall
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -7,7 +7,7 @@ module Hash2Pub.DHTProtocol
 | 
			
		|||
    , markCacheEntryAsVerified
 | 
			
		||||
    , RemoteCacheEntry(..)
 | 
			
		||||
    , toRemoteCacheEntry
 | 
			
		||||
    , remoteNode_
 | 
			
		||||
    , remoteNode
 | 
			
		||||
    , Action(..)
 | 
			
		||||
    , ActionPayload(..)
 | 
			
		||||
    , FediChordMessage(..)
 | 
			
		||||
| 
						 | 
				
			
			@ -15,17 +15,25 @@ module Hash2Pub.DHTProtocol
 | 
			
		|||
    )
 | 
			
		||||
        where
 | 
			
		||||
 | 
			
		||||
import           Control.Concurrent.Async
 | 
			
		||||
import           Control.Concurrent.STM
 | 
			
		||||
import           Control.Concurrent.STM.TBQueue
 | 
			
		||||
import           Control.Concurrent.STM.TQueue
 | 
			
		||||
import           Control.Monad                  (foldM, forM, forM_)
 | 
			
		||||
import qualified Data.ByteString                as BS
 | 
			
		||||
import           Data.Either                    (rights)
 | 
			
		||||
import           Data.Foldable                  (foldl')
 | 
			
		||||
import           Data.IORef
 | 
			
		||||
import qualified Data.Map                       as Map
 | 
			
		||||
import           Data.Maybe                     (fromMaybe, maybe)
 | 
			
		||||
import           Data.Maybe                     (fromJust, fromMaybe, mapMaybe,
 | 
			
		||||
                                                 maybe)
 | 
			
		||||
import qualified Data.Set                       as Set
 | 
			
		||||
import           Data.Time.Clock.POSIX
 | 
			
		||||
import           Network.Socket                 hiding (recv, recvFrom, send,
 | 
			
		||||
                                                 sendTo)
 | 
			
		||||
import           Network.Socket.ByteString
 | 
			
		||||
import           Safe
 | 
			
		||||
import           System.Random
 | 
			
		||||
import           System.Timeout
 | 
			
		||||
 | 
			
		||||
import           Hash2Pub.ASN1Coding
 | 
			
		||||
| 
						 | 
				
			
			@ -34,9 +42,13 @@ import           Hash2Pub.FediChord             (CacheEntry (..), NodeCache,
 | 
			
		|||
                                                 cacheGetNodeStateUnvalidated,
 | 
			
		||||
                                                 cacheLookup, cacheLookupPred,
 | 
			
		||||
                                                 cacheLookupSucc,
 | 
			
		||||
                                                 getCacheWriteQueue,
 | 
			
		||||
                                                 getLNumBestNodes,
 | 
			
		||||
                                                 getNodeCacheRef,
 | 
			
		||||
                                                 getPredecessors, getSuccessors,
 | 
			
		||||
                                                 localCompare, putPredecessors,
 | 
			
		||||
                                                 putSuccessors)
 | 
			
		||||
                                                 localCompare, mkSendSocket,
 | 
			
		||||
                                                 mkServerSocket,
 | 
			
		||||
                                                 putPredecessors, putSuccessors)
 | 
			
		||||
import           Hash2Pub.ProtocolTypes
 | 
			
		||||
 | 
			
		||||
import           Debug.Trace                    (trace)
 | 
			
		||||
| 
						 | 
				
			
			@ -123,36 +135,71 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
 | 
			
		|||
 | 
			
		||||
-- ====== message send and receive operations ======
 | 
			
		||||
 | 
			
		||||
--requestQueryID :: NodeState -> NodeID -> IO NodeState
 | 
			
		||||
---- 1. do a local lookup for the l closest nodes
 | 
			
		||||
---- 2. create l sockets
 | 
			
		||||
---- 3. send a message async concurrently to all l nodes
 | 
			
		||||
---- 4. collect the results, insert them into cache
 | 
			
		||||
---- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
 | 
			
		||||
--requestQueryID ns targetID = do
 | 
			
		||||
--    cacheSnapshot <- readIORef $ getNodeCacheRef ns
 | 
			
		||||
--    let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
 | 
			
		||||
--    -- FOUND can only be returned if targetID is owned by local node
 | 
			
		||||
--    case localResult of
 | 
			
		||||
--      FOUND thisNode -> return thisNode
 | 
			
		||||
--      FORWARD nodeSet ->
 | 
			
		||||
--          sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
 | 
			
		||||
--          -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
 | 
			
		||||
--          responses = mapM
 | 
			
		||||
requestQueryID :: NodeState -> NodeID -> IO NodeState
 | 
			
		||||
-- 1. do a local lookup for the l closest nodes
 | 
			
		||||
-- 2. create l sockets
 | 
			
		||||
-- 3. send a message async concurrently to all l nodes
 | 
			
		||||
-- 4. collect the results, insert them into cache
 | 
			
		||||
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
 | 
			
		||||
requestQueryID ns targetID = do
 | 
			
		||||
    firstCacheSnapshot <- readIORef $ fromJust . getNodeCacheRef $ ns
 | 
			
		||||
    lookupLoop firstCacheSnapshot
 | 
			
		||||
  where
 | 
			
		||||
    lookupLoop :: NodeCache -> IO NodeState
 | 
			
		||||
    lookupLoop cacheSnapshot = do
 | 
			
		||||
        let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
 | 
			
		||||
        -- FOUND can only be returned if targetID is owned by local node
 | 
			
		||||
        case localResult of
 | 
			
		||||
          FOUND thisNode -> pure thisNode
 | 
			
		||||
          FORWARD nodeSet -> do
 | 
			
		||||
              -- create connected sockets to all query targets
 | 
			
		||||
              sockets <- mapM (\resultNode -> mkSendSocket (domain resultNode) (dhtPort resultNode)) $ remoteNode <$> Set.toList nodeSet
 | 
			
		||||
              -- ToDo: make attempts and timeout configurable
 | 
			
		||||
              queryThreads <- mapM (async . sendRequestTo 5000 3 (lookupMessage targetID)) sockets
 | 
			
		||||
              -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
 | 
			
		||||
              -- ToDo: exception handling, maybe log them
 | 
			
		||||
              responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads
 | 
			
		||||
              -- insert new cache entries both into global cache as well as in local copy, to make sure it is already up to date at next lookup
 | 
			
		||||
              now <- getPOSIXTime
 | 
			
		||||
              newLCache <- foldM (\oldCache resp -> do
 | 
			
		||||
                let entriesToInsert = case queryResult <$> payload resp of
 | 
			
		||||
                                 Just (FOUND result1) -> [addCacheEntryPure now (RemoteCacheEntry result1 now)]
 | 
			
		||||
                                 Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset
 | 
			
		||||
                                 _ -> []
 | 
			
		||||
                -- forward entries to global cache
 | 
			
		||||
                forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (fromJust . getCacheWriteQueue $ ns) entry
 | 
			
		||||
                -- insert entries into local cache copy
 | 
			
		||||
                pure $ foldl' (
 | 
			
		||||
                    \oldLCache insertFunc -> insertFunc oldLCache
 | 
			
		||||
                              ) oldCache entriesToInsert
 | 
			
		||||
                                ) cacheSnapshot responses
 | 
			
		||||
 | 
			
		||||
              -- check for a FOUND and return it
 | 
			
		||||
              let foundResp = headMay . mapMaybe (\resp -> case queryResult <$> payload resp of
 | 
			
		||||
                        Just (FOUND ns') -> Just ns'
 | 
			
		||||
                        _                -> Nothing
 | 
			
		||||
                                               ) $ responses
 | 
			
		||||
              -- if no FOUND, recursively call lookup again
 | 
			
		||||
              maybe (lookupLoop newLCache) pure foundResp
 | 
			
		||||
 | 
			
		||||
    -- todo: random request ID
 | 
			
		||||
    lookupMessage targetID rID = Request rID ns 1 1 QueryID (Just $ pl ns targetID)
 | 
			
		||||
    pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . fromJust . getLNumBestNodes $ ns }
 | 
			
		||||
 | 
			
		||||
-- | Generic function for sending a request over a connected socket and collecting the response.
 | 
			
		||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
 | 
			
		||||
sendRequestTo :: Int                    -- ^ timeout in seconds
 | 
			
		||||
              -> Int                    -- ^ number of retries
 | 
			
		||||
              -> FediChordMessage       -- ^ the message to be sent
 | 
			
		||||
              -> (Integer -> FediChordMessage)       -- ^ the message to be sent, still needing a requestID
 | 
			
		||||
              -> Socket                 -- ^ connected socket to use for sending
 | 
			
		||||
              -> IO (Set.Set FediChordMessage)  -- ^ responses
 | 
			
		||||
sendRequestTo timeoutMillis numAttempts msg sock = do
 | 
			
		||||
    let requests = serialiseMessage 1200 msg
 | 
			
		||||
sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
 | 
			
		||||
    -- give the message a random request ID
 | 
			
		||||
    randomID <- randomRIO (0, 2^32-1)
 | 
			
		||||
    let requests = serialiseMessage 1200 $ msgIncomplete randomID
 | 
			
		||||
    -- create a queue for passing received response messages back, even after a timeout
 | 
			
		||||
    responseQ <- newTBQueueIO $ 2*maximumParts  -- keep room for duplicate packets
 | 
			
		||||
    -- start sendAndAck with timeout
 | 
			
		||||
    -- ToDo: make attempts and timeout configurable
 | 
			
		||||
    attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests
 | 
			
		||||
    -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
 | 
			
		||||
    recvdParts <- atomically $ flushTBQueue responseQ
 | 
			
		||||
| 
						 | 
				
			
			@ -190,13 +237,6 @@ sendRequestTo timeoutMillis numAttempts msg sock = do
 | 
			
		|||
                 else recvLoop responseQueue newRemaining receivedPartNums
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
-- idea: send all parts at once
 | 
			
		||||
-- Set/ Map with unacked parts
 | 
			
		||||
-- then recv with timeout for |unackedParts| attempts, receive acked parts from set/ map
 | 
			
		||||
-- how to manage individual retries? nested "attempts"
 | 
			
		||||
 | 
			
		||||
-- | retry an IO action at most *i* times until it delivers a result
 | 
			
		||||
attempts :: Int             -- ^ number of retries *i*
 | 
			
		||||
         -> IO (Maybe a)    -- ^ action to retry
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,4 +1,5 @@
 | 
			
		|||
{-# LANGUAGE DataKinds                  #-}
 | 
			
		||||
{-# LANGUAGE DerivingStrategies         #-}
 | 
			
		||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
 | 
			
		||||
{-# LANGUAGE OverloadedStrings          #-}
 | 
			
		||||
{- |
 | 
			
		||||
| 
						 | 
				
			
			@ -24,6 +25,7 @@ module Hash2Pub.FediChord (
 | 
			
		|||
  , getPredecessors
 | 
			
		||||
  , putPredecessors
 | 
			
		||||
  , getLNumBestNodes
 | 
			
		||||
  , getCacheWriteQueue
 | 
			
		||||
  , NodeCache
 | 
			
		||||
  , CacheEntry(..)
 | 
			
		||||
  , cacheGetNodeStateUnvalidated
 | 
			
		||||
| 
						 | 
				
			
			@ -43,6 +45,7 @@ module Hash2Pub.FediChord (
 | 
			
		|||
  , fediChordInit
 | 
			
		||||
  , nodeStateInit
 | 
			
		||||
  , mkServerSocket
 | 
			
		||||
  , mkSendSocket
 | 
			
		||||
  , resolve
 | 
			
		||||
  , cacheWriter
 | 
			
		||||
                           ) where
 | 
			
		||||
| 
						 | 
				
			
			@ -82,7 +85,7 @@ idBits = 256
 | 
			
		|||
--
 | 
			
		||||
-- for being able to check value bounds, the constructor should not be used directly
 | 
			
		||||
-- and new values are created via @toNodeID@ (newtype constructors cannot be hidden)
 | 
			
		||||
newtype NodeID = NodeID { getNodeID :: Integer } deriving (Eq, Show, Enum)
 | 
			
		||||
newtype NodeID = NodeID { getNodeID :: Integer } deriving stock (Show, Eq) deriving newtype Enum
 | 
			
		||||
 | 
			
		||||
-- |smart data constructor for NodeID that throws a runtime exception for out-of-bounds values.
 | 
			
		||||
-- When needing a runtime-safe constructor with drawbacks, try @fromInteger@
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -84,6 +84,6 @@ toRemoteCacheEntry (NodeEntry _ ns ts) = Just $ RemoteCacheEntry ns ts
 | 
			
		|||
toRemoteCacheEntry (ProxyEntry _ (Just entry@NodeEntry{})) = toRemoteCacheEntry entry
 | 
			
		||||
toRemoteCacheEntry _ = Nothing
 | 
			
		||||
 | 
			
		||||
-- helper function for use in tests
 | 
			
		||||
remoteNode_ :: RemoteCacheEntry -> NodeState
 | 
			
		||||
remoteNode_ (RemoteCacheEntry ns _) = ns
 | 
			
		||||
-- | extract the 'NodeState' from a 'RemoteCacheEntry'
 | 
			
		||||
remoteNode :: RemoteCacheEntry -> NodeState
 | 
			
		||||
remoteNode (RemoteCacheEntry ns _) = ns
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -138,12 +138,12 @@ spec = do
 | 
			
		|||
            queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 2342) `shouldReturn` FORWARD Set.empty
 | 
			
		||||
        it "works on a cache with less entries than needed" $ do
 | 
			
		||||
            (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5)
 | 
			
		||||
            Set.map (nid . remoteNode_) nodeset `shouldBe` Set.fromList [ nid1, nid2 ]
 | 
			
		||||
            Set.map (nid . remoteNode) nodeset `shouldBe` Set.fromList [ nid1, nid2 ]
 | 
			
		||||
        it "works on a cache with sufficient entries" $ do
 | 
			
		||||
            (FORWARD nodeset1) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5)
 | 
			
		||||
            (FORWARD nodeset2) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5)
 | 
			
		||||
            Set.map (nid . remoteNode_) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3]
 | 
			
		||||
            Set.map (nid . remoteNode_) nodeset2 `shouldBe` Set.fromList [nid4]
 | 
			
		||||
            Set.map (nid . remoteNode) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3]
 | 
			
		||||
            Set.map (nid . remoteNode) nodeset2 `shouldBe` Set.fromList [nid4]
 | 
			
		||||
        it "recognises the node's own responsibility" $ do
 | 
			
		||||
            FOUND selfQueryRes <- queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure nid1
 | 
			
		||||
            nid <$> node1 `shouldReturn` nid selfQueryRes
 | 
			
		||||
| 
						 | 
				
			
			@ -151,7 +151,7 @@ spec = do
 | 
			
		|||
            nid <$> node1 `shouldReturn` nid responsibilityResult
 | 
			
		||||
        it "does not fail on nodes without neighbours (initial state)" $ do
 | 
			
		||||
            (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 11)
 | 
			
		||||
            Set.map (nid . remoteNode_ ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3]
 | 
			
		||||
            Set.map (nid . remoteNode ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3]
 | 
			
		||||
 | 
			
		||||
    describe "Messages can be encoded to and decoded from ASN.1" $ do
 | 
			
		||||
        -- define test messages
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue