forked from schmittlauch/Hash2Pub
make tests compile again (do not pass yet)
This commit is contained in:
parent
c304e2103f
commit
895606d542
|
@ -7,7 +7,7 @@ import Data.ASN1.BinaryEncoding
|
|||
import Data.ASN1.Error()
|
||||
import Data.ASN1.Types -- asn1-types package
|
||||
import Data.ASN1.Parse
|
||||
import Data.Maybe (fromMaybe, mapMaybe)
|
||||
import Data.Maybe (fromMaybe, mapMaybe, isNothing)
|
||||
import Data.Time.Clock.POSIX()
|
||||
import qualified Data.ByteString as BS
|
||||
import qualified Data.Set as Set
|
||||
|
@ -81,6 +81,8 @@ serialiseMessage :: Int -- maximum message size in bytes
|
|||
-> FediChordMessage -- mesage to be serialised in preparation for sending
|
||||
-> Map.Map Integer BS.ByteString -- list of ASN.1 DER encoded messages together representing
|
||||
-- the contents of the input message
|
||||
-- messages without payload are not split
|
||||
serialiseMessage _ msg | isNothing (payload msg) = Map.singleton 1 $ encodeMsg msg
|
||||
serialiseMessage maxBytesLength msg =
|
||||
splitPayloadUntilSmallEnough 1
|
||||
where
|
||||
|
@ -92,21 +94,25 @@ serialiseMessage maxBytesLength msg =
|
|||
| numParts == maximumParts = encodedMsgs numParts
|
||||
| otherwise = splitPayloadUntilSmallEnough $ numParts + 1
|
||||
messageParts :: Int -> Map.Map Integer FediChordMessage
|
||||
messageParts i = Map.fromAscList $ foldr (modifyMessage i) [] $ payloadParts i
|
||||
messageParts i = Map.fromAscList $ foldr (modifyMessage i) [] $ fromMaybe [] $ payloadParts i
|
||||
-- insert payload parts into message and adjust parts metadata
|
||||
modifyMessage :: Int -> (Integer, ActionPayload) -> [(Integer, FediChordMessage)] -> [(Integer, FediChordMessage)]
|
||||
modifyMessage i (partNum, pl) pls = (partNum, msg {
|
||||
part = partNum
|
||||
, payload = pl
|
||||
, payload = Just pl
|
||||
, parts = fromIntegral i
|
||||
}):pls
|
||||
-- part starts at 1
|
||||
payloadParts :: Int -> [(Integer, ActionPayload)]
|
||||
payloadParts i = zip [1..] (splitPayload i actionPayload)
|
||||
payloadParts :: Int -> Maybe [(Integer, ActionPayload)]
|
||||
payloadParts i = zip [1..] . splitPayload i <$> actionPayload
|
||||
actionPayload = payload msg
|
||||
encodedMsgs i = Map.map (encodeASN1' DER . encodeMessage) $ messageParts i
|
||||
encodedMsgs i = Map.map encodeMsg $ messageParts i
|
||||
maxMsgLength = maximum . map BS.length . Map.elems
|
||||
|
||||
-- | encode a 'FediChordMessage' to a bytestring without further modification
|
||||
encodeMsg :: FediChordMessage -> BS.ByteString
|
||||
encodeMsg = encodeASN1' DER . encodeMessage
|
||||
|
||||
-- | Deserialise a ASN.1 DER encoded bytesstring of a single 'FediChordMessage'.
|
||||
deserialiseMessage :: BS.ByteString
|
||||
-> Either String FediChordMessage
|
||||
|
|
|
@ -4,6 +4,7 @@ module Hash2Pub.DHTProtocol
|
|||
( QueryResponse (..)
|
||||
, queryLocalCache
|
||||
, addCacheEntry
|
||||
, addCacheEntryPure
|
||||
, deleteCacheEntry
|
||||
, markCacheEntryAsVerified
|
||||
, RemoteCacheEntry(..)
|
||||
|
@ -218,4 +219,4 @@ attempts i action = do
|
|||
actionResult <- action
|
||||
case actionResult of
|
||||
Nothing -> attempts (i-1) action
|
||||
Just res -> return res
|
||||
Just res -> return $ Just res
|
||||
|
|
|
@ -39,6 +39,7 @@ module Hash2Pub.FediChord (
|
|||
, bsAsIpAddr
|
||||
, FediChordConf(..)
|
||||
, fediChordInit
|
||||
, nodeStateInit
|
||||
, mkServerSocket
|
||||
, resolve
|
||||
, cacheWriter
|
||||
|
@ -436,6 +437,14 @@ data FediChordConf = FediChordConf {
|
|||
-- ToDo: load persisted state, thus this function already operates in IO
|
||||
fediChordInit :: FediChordConf -> IO (Socket, NodeState)
|
||||
fediChordInit conf = do
|
||||
initialState <- nodeStateInit conf
|
||||
serverSock <- mkServerSocket (ipAddr initialState) (dhtPort initialState)
|
||||
return (serverSock, initialState)
|
||||
|
||||
-- | initialises the 'NodeState' for this local node.
|
||||
-- Separated from 'fediChordInit' to be usable in tests.
|
||||
nodeStateInit :: FediChordConf -> IO NodeState
|
||||
nodeStateInit conf = do
|
||||
cacheRef <- newIORef initCache
|
||||
q <- atomically newTQueue
|
||||
let
|
||||
|
@ -458,10 +467,7 @@ fediChordInit conf = do
|
|||
, pNumParallelQueries = 2
|
||||
, jEntriesPerSlice = 2
|
||||
}
|
||||
serverSock <- mkServerSocket (ipAddr initialState) (dhtPort initialState)
|
||||
return (serverSock, initialState)
|
||||
|
||||
|
||||
return initialState
|
||||
|
||||
--fediChordJoin :: NodeState -- ^ the local 'NodeState'
|
||||
-- -> (String, PortNumber) -- ^ domain and port of a bootstrapping node
|
||||
|
@ -474,6 +480,8 @@ fediChordInit conf = do
|
|||
-- -- ToDo: implement cache management, as already all received replies should be stored in cache
|
||||
--
|
||||
|
||||
-- | cache updater thread that waits for incoming NodeCache update instructions on
|
||||
-- the node's cacheWriteQueue and then modifies the NodeCache as the single writer.
|
||||
cacheWriter :: NodeState -> IO ()
|
||||
cacheWriter ns = do
|
||||
let writeQueue' = getCacheWriteQueue ns
|
||||
|
|
|
@ -10,6 +10,7 @@ import qualified Data.ByteString as BS
|
|||
import qualified Data.Set as Set
|
||||
import Data.ASN1.Parse (runParseASN1)
|
||||
import Data.Time.Clock.POSIX
|
||||
import Data.IORef
|
||||
|
||||
import Hash2Pub.FediChord
|
||||
import Hash2Pub.DHTProtocol
|
||||
|
@ -73,12 +74,12 @@ spec = do
|
|||
(bsAsIpAddr . ipAddrAsBS $ ipAddr exampleNodeState) `shouldBe` ipAddr exampleNodeState
|
||||
describe "NodeCache" $ do
|
||||
let
|
||||
emptyCache = fromJust $ getNodeCache exampleLocalNode
|
||||
exampleID = nid exampleLocalNode
|
||||
emptyCache = initCache
|
||||
anotherID = toNodeID 2^(230::Integer)+1
|
||||
anotherNode = exampleNodeState { nid = anotherID}
|
||||
maxNode = exampleNodeState { nid = maxBound}
|
||||
newCache = addCacheWrapper (remoteEntryFromNow exampleLocalNode) =<< addCacheWrapper (remoteEntryFromNow anotherNode) emptyCache
|
||||
newCache = addCacheEntryPure 10 <$> (RemoteCacheEntry <$> exampleLocalNode <*> pure 10) <*> (addCacheEntryPure 10 <$> pure (RemoteCacheEntry anotherNode 10) <*> pure emptyCache)
|
||||
exampleID = nid exampleNodeState
|
||||
it "entries can be added to a node cache and looked up again" $ do
|
||||
nC <- newCache
|
||||
-- the cache includes 2 additional proxy elements right from the start
|
||||
|
@ -90,7 +91,7 @@ spec = do
|
|||
cacheLookup minBound emptyCache `shouldBe` Nothing
|
||||
cacheLookup maxBound emptyCache `shouldBe` Nothing
|
||||
-- now store a node at that ID
|
||||
cacheWithMaxNode <- addCacheWrapper (remoteEntryFromNow maxNode) =<< newCache
|
||||
cacheWithMaxNode <- addCacheEntryPure 10 <$> pure (RemoteCacheEntry maxNode 10) <*> newCache
|
||||
nid . cacheGetNodeStateUnvalidated <$> cacheLookup maxBound cacheWithMaxNode `shouldBe` Just maxBound
|
||||
it "looking up predecessor and successor works like on a modular ring" $ do
|
||||
-- ignore empty proxy elements in initial cache
|
||||
|
@ -107,11 +108,11 @@ spec = do
|
|||
nid. cacheGetNodeStateUnvalidated <$> cacheLookupPred (exampleID - 2) nC `shouldBe` Just anotherID
|
||||
nid. cacheGetNodeStateUnvalidated <$> cacheLookupSucc (anotherID + 2) nC `shouldBe` Just exampleID
|
||||
-- now store a node in one of the ProxyEntries
|
||||
cacheWithProxyNodeEntry <- addCacheWrapper (remoteEntryFromNow maxNode) =<< newCache
|
||||
cacheWithProxyNodeEntry <- addCacheEntryPure 10 <$> pure (RemoteCacheEntry maxNode 10) <*> newCache
|
||||
nid. cacheGetNodeStateUnvalidated <$> cacheLookupPred (exampleID - 2) cacheWithProxyNodeEntry `shouldBe` Just maxBound
|
||||
nid. cacheGetNodeStateUnvalidated <$> cacheLookupSucc (anotherID + 2) cacheWithProxyNodeEntry `shouldBe` Just maxBound
|
||||
it "entries can be deleted" $ do
|
||||
nC <- addCacheWrapper (remoteEntryFromNow maxNode) =<< newCache
|
||||
nC <- addCacheEntryPure 10 <$> pure (RemoteCacheEntry maxNode 10) <*> newCache
|
||||
let nc' = deleteCacheEntry maxBound . deleteCacheEntry anotherID $ nC
|
||||
cacheLookup anotherID nc' `shouldBe` Nothing
|
||||
cacheLookup maxBound nc' `shouldBe` Nothing
|
||||
|
@ -119,38 +120,36 @@ spec = do
|
|||
|
||||
describe "NodeCache query lookup" $ do
|
||||
let
|
||||
emptyCache = fromJust $ getNodeCache exampleLocalNode
|
||||
emptyCache = initCache
|
||||
nid1 = toNodeID 2^(23::Integer)+1
|
||||
node1 = putPredecessors [nid4] $ exampleLocalNode { nid = nid1}
|
||||
node1 = do
|
||||
eln <- exampleLocalNode
|
||||
return $ putPredecessors [nid4] $ eln {nid = nid1}
|
||||
nid2 = toNodeID 2^(230::Integer)+12
|
||||
node2 = exampleNodeState { nid = nid2}
|
||||
nid3 = toNodeID 2^(25::Integer)+10
|
||||
node3 = exampleNodeState { nid = nid3}
|
||||
nid4 = toNodeID 2^(9::Integer)+100
|
||||
node4 = exampleNodeState { nid = nid4}
|
||||
cacheWith2Entries = addCacheWrapper (remoteEntryFromNow node1) =<< addCacheWrapper (remoteEntryFromNow node2) emptyCache
|
||||
cacheWith4Entries = addCacheWrapper (remoteEntryFromNow node3) =<< addCacheWrapper (remoteEntryFromNow node4) =<< cacheWith2Entries
|
||||
cacheWith2Entries :: IO NodeCache
|
||||
cacheWith2Entries = addCacheEntryPure 10 <$> (RemoteCacheEntry <$> node1 <*> pure 10) <*> pure (addCacheEntryPure 10 (RemoteCacheEntry node2 10) emptyCache)
|
||||
cacheWith4Entries = addCacheEntryPure 10 (RemoteCacheEntry node3 10) <$> (addCacheEntryPure 10 (RemoteCacheEntry node4 10) <$> cacheWith2Entries)
|
||||
it "works on an empty cache" $ do
|
||||
queryLocalCache exampleLocalNode emptyCache 3 (toNodeID 2^(9::Integer)+5) `shouldBe` FORWARD Set.empty
|
||||
queryLocalCache exampleLocalNode emptyCache 1 (toNodeID 2342) `shouldBe` FORWARD Set.empty
|
||||
queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5) `shouldReturn` FORWARD Set.empty
|
||||
queryLocalCache <$> exampleLocalNode <*> pure emptyCache <*> pure 1 <*> pure (toNodeID 2342) `shouldReturn` FORWARD Set.empty
|
||||
it "works on a cache with less entries than needed" $ do
|
||||
c2 <- cacheWith2Entries
|
||||
let (FORWARD nodeset) = queryLocalCache exampleLocalNode c2 4 (toNodeID 2^(9::Integer)+5)
|
||||
(FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith2Entries <*> pure 4 <*> pure (toNodeID 2^(9::Integer)+5)
|
||||
Set.map (nid . remoteNode_) nodeset `shouldBe` Set.fromList [ nid1, nid2 ]
|
||||
it "works on a cache with sufficient entries" $ do
|
||||
c4 <- cacheWith4Entries
|
||||
let
|
||||
(FORWARD nodeset1) = queryLocalCache exampleLocalNode c4 3 (toNodeID 2^(9::Integer)+5)
|
||||
(FORWARD nodeset2) = queryLocalCache exampleLocalNode c4 1 (toNodeID 2^(9::Integer)+5)
|
||||
(FORWARD nodeset1) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(9::Integer)+5)
|
||||
(FORWARD nodeset2) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 1 <*> pure (toNodeID 2^(9::Integer)+5)
|
||||
Set.map (nid . remoteNode_) nodeset1 `shouldBe` Set.fromList [nid4, nid2, nid3]
|
||||
Set.map (nid . remoteNode_) nodeset2 `shouldBe` Set.fromList [nid4]
|
||||
it "recognises the node's own responsibility" $ do
|
||||
nC <- cacheWith4Entries
|
||||
queryLocalCache node1 nC 3 (toNodeID 2^(22::Integer)) `shouldBe` FOUND node1
|
||||
queryLocalCache node1 nC 3 nid1 `shouldBe` FOUND node1
|
||||
(==) <$> (queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 2^(22::Integer))) <*> (FOUND <$> node1) `shouldReturn` True
|
||||
(==) <$> (queryLocalCache <$> node1 <*> cacheWith4Entries <*> pure 3 <*> pure nid1) <*> (FOUND <$> node1) `shouldReturn` True
|
||||
it "does not fail on nodes without neighbours (initial state)" $ do
|
||||
nC <- cacheWith4Entries
|
||||
let (FORWARD nodeset) = queryLocalCache exampleLocalNode nC 3 (toNodeID 11)
|
||||
(FORWARD nodeset) <- queryLocalCache <$> exampleLocalNode <*> cacheWith4Entries <*> pure 3 <*> pure (toNodeID 11)
|
||||
Set.map (nid . remoteNode_ ) nodeset `shouldBe` Set.fromList [nid4, nid2, nid3]
|
||||
|
||||
describe "Messages can be encoded to and decoded from ASN.1" $ do
|
||||
|
@ -229,7 +228,7 @@ spec = do
|
|||
encodeDecodeAndCheck $ responseWith Stabilise stabResPayload
|
||||
encodeDecodeAndCheck $ responseWith Ping pingResPayload
|
||||
it "messages are encoded and decoded to ASN.1 DER properly" $
|
||||
deserialiseMessage (head $ serialiseMessage 652 $ responseWith Ping pingResPayload) `shouldBe` Right (responseWith Ping pingResPayload)
|
||||
deserialiseMessage (fromJust $ Map.lookup 1 (serialiseMessage 652 $ responseWith Ping pingResPayload)) `shouldBe` Right (responseWith Ping pingResPayload)
|
||||
it "messages too large for a single packet can (often) be split into multiple parts" $ do
|
||||
let largeMessage = responseWith Join $ JoinResponsePayload {
|
||||
joinSuccessors = fromInteger <$> [-20..150]
|
||||
|
@ -254,33 +253,15 @@ exampleNodeState = NodeState {
|
|||
, internals = Nothing
|
||||
}
|
||||
|
||||
exampleInternals :: InternalNodeState
|
||||
exampleInternals = InternalNodeState {
|
||||
nodeCache = initCache
|
||||
, successors = []
|
||||
, predecessors = []
|
||||
, kNeighbours = 3
|
||||
, lNumBestNodes = 3
|
||||
, pNumParallelQueries = 2
|
||||
, jEntriesPerSlice = 2
|
||||
exampleLocalNode :: IO NodeState
|
||||
exampleLocalNode = nodeStateInit $ FediChordConf {
|
||||
confDomain = "example.social"
|
||||
, confIP = exampleIp
|
||||
, confDhtPort = 2342
|
||||
}
|
||||
exampleLocalNode :: NodeState
|
||||
exampleLocalNode = exampleNodeState {internals = Just exampleInternals}
|
||||
|
||||
exampleNodeDomain :: String
|
||||
exampleNodeDomain = "example.social"
|
||||
exampleVs :: (Integral i) => i
|
||||
exampleVs = 4
|
||||
exampleIp :: HostAddress6
|
||||
exampleIp = tupleToHostAddress6 (0x2001, 0x16b8, 0x755a, 0xb110, 0x7d6a, 0x12ab, 0xf0c5, 0x386e)
|
||||
|
||||
-- | helper function to create a 'RemoteCacheEntry' with the current time stamp
|
||||
remoteEntryFromNow :: NodeState -> IO RemoteCacheEntry
|
||||
remoteEntryFromNow ns = RemoteCacheEntry ns <$> getPOSIXTime
|
||||
|
||||
-- | helper function for chaining the IO actions of RemoteCacheEntry creation
|
||||
-- and adding to cache
|
||||
addCacheWrapper :: IO RemoteCacheEntry -> NodeCache -> IO NodeCache
|
||||
addCacheWrapper entryIO nc = do
|
||||
entry <- entryIO
|
||||
addCacheEntry entry nc
|
||||
|
|
Loading…
Reference in a new issue