diff --git a/Hash2Pub/src/Hash2Pub/ASN1Coding.hs b/Hash2Pub/src/Hash2Pub/ASN1Coding.hs index ed20e1d..faa653f 100644 --- a/Hash2Pub/src/Hash2Pub/ASN1Coding.hs +++ b/Hash2Pub/src/Hash2Pub/ASN1Coding.hs @@ -7,7 +7,7 @@ import Data.ASN1.BinaryEncoding import Data.ASN1.Error() import Data.ASN1.Types -- asn1-types package import Data.ASN1.Parse -import Data.Maybe (fromMaybe, mapMaybe) +import Data.Maybe (fromMaybe, mapMaybe, isNothing) import Data.Time.Clock.POSIX() import qualified Data.ByteString as BS import qualified Data.Set as Set @@ -81,6 +81,8 @@ serialiseMessage :: Int -- maximum message size in bytes -> FediChordMessage -- mesage to be serialised in preparation for sending -> Map.Map Integer BS.ByteString -- list of ASN.1 DER encoded messages together representing -- the contents of the input message +-- messages without payload are not split +serialiseMessage _ msg | isNothing (payload msg) = Map.singleton 1 $ encodeMsg msg serialiseMessage maxBytesLength msg = splitPayloadUntilSmallEnough 1 where @@ -92,21 +94,25 @@ serialiseMessage maxBytesLength msg = | numParts == maximumParts = encodedMsgs numParts | otherwise = splitPayloadUntilSmallEnough $ numParts + 1 messageParts :: Int -> Map.Map Integer FediChordMessage - messageParts i = Map.fromAscList $ foldr (modifyMessage i) [] $ payloadParts i + messageParts i = Map.fromAscList $ foldr (modifyMessage i) [] $ fromMaybe [] $ payloadParts i -- insert payload parts into message and adjust parts metadata modifyMessage :: Int -> (Integer, ActionPayload) -> [(Integer, FediChordMessage)] -> [(Integer, FediChordMessage)] modifyMessage i (partNum, pl) pls = (partNum, msg { part = partNum - , payload = pl + , payload = Just pl , parts = fromIntegral i }):pls -- part starts at 1 - payloadParts :: Int -> [(Integer, ActionPayload)] - payloadParts i = zip [1..] (splitPayload i actionPayload) + payloadParts :: Int -> Maybe [(Integer, ActionPayload)] + payloadParts i = zip [1..] . splitPayload i <$> actionPayload actionPayload = payload msg - encodedMsgs i = Map.map (encodeASN1' DER . encodeMessage) $ messageParts i + encodedMsgs i = Map.map encodeMsg $ messageParts i maxMsgLength = maximum . map BS.length . Map.elems +-- | encode a 'FediChordMessage' to a bytestring without further modification +encodeMsg :: FediChordMessage -> BS.ByteString +encodeMsg = encodeASN1' DER . encodeMessage + -- | Deserialise a ASN.1 DER encoded bytesstring of a single 'FediChordMessage'. deserialiseMessage :: BS.ByteString -> Either String FediChordMessage diff --git a/Hash2Pub/src/Hash2Pub/DHTProtocol.hs b/Hash2Pub/src/Hash2Pub/DHTProtocol.hs index d02b312..a90c304 100644 --- a/Hash2Pub/src/Hash2Pub/DHTProtocol.hs +++ b/Hash2Pub/src/Hash2Pub/DHTProtocol.hs @@ -4,6 +4,7 @@ module Hash2Pub.DHTProtocol ( QueryResponse (..) , queryLocalCache , addCacheEntry + , addCacheEntryPure , deleteCacheEntry , markCacheEntryAsVerified , RemoteCacheEntry(..) @@ -218,4 +219,4 @@ attempts i action = do actionResult <- action case actionResult of Nothing -> attempts (i-1) action - Just res -> return res + Just res -> return $ Just res diff --git a/Hash2Pub/src/Hash2Pub/FediChord.hs b/Hash2Pub/src/Hash2Pub/FediChord.hs index 1bea5af..cdc3b45 100644 --- a/Hash2Pub/src/Hash2Pub/FediChord.hs +++ b/Hash2Pub/src/Hash2Pub/FediChord.hs @@ -39,6 +39,7 @@ module Hash2Pub.FediChord ( , bsAsIpAddr , FediChordConf(..) , fediChordInit + , nodeStateInit , mkServerSocket , resolve , cacheWriter @@ -436,6 +437,14 @@ data FediChordConf = FediChordConf { -- ToDo: load persisted state, thus this function already operates in IO fediChordInit :: FediChordConf -> IO (Socket, NodeState) fediChordInit conf = do + initialState <- nodeStateInit conf + serverSock <- mkServerSocket (ipAddr initialState) (dhtPort initialState) + return (serverSock, initialState) + +-- | initialises the 'NodeState' for this local node. +-- Separated from 'fediChordInit' to be usable in tests. +nodeStateInit :: FediChordConf -> IO NodeState +nodeStateInit conf = do cacheRef <- newIORef initCache q <- atomically newTQueue let @@ -458,10 +467,7 @@ fediChordInit conf = do , pNumParallelQueries = 2 , jEntriesPerSlice = 2 } - serverSock <- mkServerSocket (ipAddr initialState) (dhtPort initialState) - return (serverSock, initialState) - - + return initialState --fediChordJoin :: NodeState -- ^ the local 'NodeState' -- -> (String, PortNumber) -- ^ domain and port of a bootstrapping node @@ -474,6 +480,8 @@ fediChordInit conf = do -- -- ToDo: implement cache management, as already all received replies should be stored in cache -- +-- | cache updater thread that waits for incoming NodeCache update instructions on +-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer. cacheWriter :: NodeState -> IO () cacheWriter ns = do let writeQueue' = getCacheWriteQueue ns diff --git a/Hash2Pub/test/FediChordSpec.hs b/Hash2Pub/test/FediChordSpec.hs index e72b516..c010e0e 100644 --- a/Hash2Pub/test/FediChordSpec.hs +++ b/Hash2Pub/test/FediChordSpec.hs @@ -10,6 +10,7 @@ import qualified Data.ByteString as BS import qualified Data.Set as Set import Data.ASN1.Parse (runParseASN1) import Data.Time.Clock.POSIX +import Data.IORef import Hash2Pub.FediChord import Hash2Pub.DHTProtocol @@ -73,12 +74,12 @@ spec = do (bsAsIpAddr . ipAddrAsBS $ ipAddr exampleNodeState) `shouldBe` ipAddr exampleNodeState describe "NodeCache" $ do let - emptyCache = fromJust $ getNodeCache exampleLocalNode - exampleID = nid exampleLocalNode + emptyCache = initCache anotherID = toNodeID 2^(230::Integer)+1 anotherNode = exampleNodeState { nid = anotherID} maxNode = exampleNodeState { nid = maxBound} - newCache = addCacheWrapper (remoteEntryFromNow exampleLocalNode) =<< addCacheWrapper (remoteEntryFromNow anotherNode) emptyCache + newCache = addCacheEntryPure 10 <$> (RemoteCacheEntry <$> exampleLocalNode <*> pure 10) <*> (addCacheEntryPure 10 <$> pure (RemoteCacheEntry anotherNode 10) <*> pure emptyCache) + exampleID = nid exampleNodeState it "entries can be added to a node cache and looked up again" $ do nC <- newCache -- the cache includes 2 additional proxy elements right from the start @@ -90,12 +91,12 @@ spec = do cacheLookup minBound emptyCache `shouldBe` Nothing cacheLookup maxBound emptyCache `shouldBe` Nothing -- now store a node at that ID - cacheWithMaxNode <- addCacheWrapper (remoteEntryFromNow maxNode) =<< newCache + cacheWithMaxNode <- addCacheEntryPure 10 <$> pure (RemoteCacheEntry maxNode 10) <*> newCache nid . cacheGetNodeStateUnvalidated <$> cacheLookup maxBound cacheWithMaxNode `shouldBe` Just maxBound it "looking up predecessor and successor works like on a modular ring" $ do -- ignore empty proxy elements in initial cache - nid. cacheGetNodeStateUnvalidated <$> cacheLookupPred (exampleID + 10) emptyCache `shouldBe` Nothing - nid. cacheGetNodeStateUnvalidated <$> cacheLookupSucc exampleID emptyCache `shouldBe` Nothing + nid . cacheGetNodeStateUnvalidated <$> cacheLookupPred (exampleID + 10) emptyCache `shouldBe` Nothing + nid . cacheGetNodeStateUnvalidated <$> cacheLookupSucc exampleID emptyCache `shouldBe` Nothing nC <- newCache -- given situation: 0 < nid exampleLocalNode < anotherNode < maxBound @@ -107,11 +108,11 @@ spec = do nid. cacheGetNodeStateUnvalidated <$> cacheLookupPred (exampleID - 2) nC `shouldBe` Just anotherID nid. cacheGetNodeStateUnvalidated <$> cacheLookupSucc (anotherID + 2) nC `shouldBe` Just exampleID -- now store a node in one of the ProxyEntries - cacheWithProxyNodeEntry <- addCacheWrapper (remoteEntryFromNow maxNode) =<< newCache + cacheWithProxyNodeEntry <- addCacheEntryPure 10 <$> pure (RemoteCacheEntry maxNode 10) <*> newCache nid. cacheGetNodeStateUnvalidated <$> cacheLookupPred (exampleID - 2) cacheWithProxyNodeEntry `shouldBe` Just maxBound nid. cacheGetNodeStateUnvalidated <$> cacheLookupSucc (anotherID + 2) cacheWithProxyNodeEntry `shouldBe` Just maxBound it "entries can be deleted" $ do - nC <- addCacheWrapper (remoteEntryFromNow maxNode) =<< newCache + nC <- addCacheEntryPure 10 <$> pure (RemoteCacheEntry maxNode 10) <*> newCache let nc' = deleteCacheEntry maxBound . deleteCacheEntry anotherID $ nC cacheLookup anotherID nc' `shouldBe` Nothing cacheLookup maxBound nc' `shouldBe` Nothing @@ -119,38 +120,36 @@ spec = do describe "NodeCache query lookup" $ do let - emptyCache = fromJust $ getNodeCache exampleLocalNode + emptyCache = initCache nid1 = toNodeID 2^(23::Integer)+1 - node1 = putPredecessors [nid4] $ exampleLocalNode { nid = nid1} + node1 = do + eln <- exampleLocalNode + return $ putPredecessors [nid4] $ eln {nid = nid1} nid2 = toNodeID 2^(230::Integer)+12 node2 = exampleNodeState { nid = nid2} nid3 = toNodeID 2^(25::Integer)+10 node3 = exampleNodeState { nid = nid3} nid4 = toNodeID 2^(9::Integer)+100 node4 = exampleNodeState { nid = nid4} - cacheWith2Entries = addCacheWrapper (remoteEntryFromNow node1) =<< addCacheWrapper (remoteEntryFromNow node2) emptyCache - cacheWith4Entries = addCacheWrapper (remoteEntryFromNow node3) =<< addCacheWrapper (remoteEntryFromNow node4) =<< cacheWith2Entries + cacheWith2Entries :: IO NodeCache + cacheWith2Entries = addCacheEntryPure 10 <$> (RemoteCacheEntry <$> node1 <*> pure 10) <*> pure (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache) + cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) <$> (addCacheEntryPure 10 (RemoteCacheEntry node4 10) <$> cacheWith2Entries) it "works on an empty cache" $ do - queryLocalCache exampleLocalNode emptyCache 3 (toNodeID 2^(9::Integer)+5) `shouldBe` FORWARD Set.empty - queryLocalCache exampleLocalNode emptyCache 1 (toNodeID 2342) `shouldBe` FORWARD Set.empty + queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FORWARD Set.empty + queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 2342) `shouldReturn` FORWARD Set.empty it "works on a cache with less entries than needed" $ do - c2 <- cacheWith2Entries - let (FORWARD nodeset) = queryLocalCache exampleLocalNode c2 4 (toNodeID 2^(9::Integer)+5) + (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5) Set.map (nid . remoteNode_) nodeset `shouldBe` Set.fromList [ nid1, nid2 ] it "works on a cache with sufficient entries" $ do - c4 <- cacheWith4Entries - let - (FORWARD nodeset1) = queryLocalCache exampleLocalNode c4 3 (toNodeID 2^(9::Integer)+5) - (FORWARD nodeset2) = queryLocalCache exampleLocalNode c4 1 (toNodeID 2^(9::Integer)+5) + (FORWARD nodeset1) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) + (FORWARD nodeset2) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5) Set.map (nid . remoteNode_) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3] Set.map (nid . remoteNode_) nodeset2 `shouldBe` Set.fromList [nid4] it "recognises the node's own responsibility" $ do - nC <- cacheWith4Entries - queryLocalCache node1 nC 3 (toNodeID 2^(22::Integer)) `shouldBe` FOUND node1 - queryLocalCache node1 nC 3 nid1 `shouldBe` FOUND node1 + (==) <$> (queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(22::Integer))) <*> (FOUND <$> node1) `shouldReturn` True + (==) <$> (queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure nid1) <*> (FOUND <$> node1) `shouldReturn` True it "does not fail on nodes without neighbours (initial state)" $ do - nC <- cacheWith4Entries - let (FORWARD nodeset) = queryLocalCache exampleLocalNode nC 3 (toNodeID 11) + (FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 11) Set.map (nid . remoteNode_ ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3] describe "Messages can be encoded to and decoded from ASN.1" $ do @@ -229,7 +228,7 @@ spec = do encodeDecodeAndCheck $ responseWith Stabilise stabResPayload encodeDecodeAndCheck $ responseWith Ping pingResPayload it "messages are encoded and decoded to ASN.1 DER properly" $ - deserialiseMessage (head $ serialiseMessage 652 $ responseWith Ping pingResPayload) `shouldBe` Right (responseWith Ping pingResPayload) + deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload) it "messages too large for a single packet can (often) be split into multiple parts" $ do let largeMessage = responseWith Join $ JoinResponsePayload { joinSuccessors = fromInteger <$> [-20..150] @@ -254,33 +253,15 @@ exampleNodeState = NodeState { , internals = Nothing } -exampleInternals :: InternalNodeState -exampleInternals = InternalNodeState { - nodeCache = initCache - , successors = [] - , predecessors = [] - , kNeighbours = 3 - , lNumBestNodes = 3 - , pNumParallelQueries = 2 - , jEntriesPerSlice = 2 - } -exampleLocalNode :: NodeState -exampleLocalNode = exampleNodeState {internals = Just exampleInternals} - +exampleLocalNode :: IO NodeState +exampleLocalNode = nodeStateInit $ FediChordConf { + confDomain = "example.social" + , confIP = exampleIp + , confDhtPort = 2342 + } exampleNodeDomain :: String exampleNodeDomain = "example.social" exampleVs :: (Integral i) => i exampleVs = 4 exampleIp :: HostAddress6 exampleIp = tupleToHostAddress6 (0x2001, 0x16b8, 0x755a, 0xb110, 0x7d6a, 0x12ab, 0xf0c5, 0x386e) - --- | helper function to create a 'RemoteCacheEntry' with the current time stamp -remoteEntryFromNow :: NodeState -> IO RemoteCacheEntry -remoteEntryFromNow ns = RemoteCacheEntry ns <$> getPOSIXTime - --- | helper function for chaining the IO actions of RemoteCacheEntry creation --- and adding to cache -addCacheWrapper :: IO RemoteCacheEntry -> NodeCache -> IO NodeCache -addCacheWrapper entryIO nc = do - entry <- entryIO - addCacheEntry entry nc