Compare commits

..

3 commits

13 changed files with 297 additions and 283 deletions

View file

@ -1,8 +0,0 @@
- group: {name: generalise, enabled: true}
- warn: { name: Use DerivingStrategies }
- error: { lhs: return, rhs: pure }
- ignore: {name: "Avoid lambda using `infix`"}

View file

@ -46,7 +46,7 @@ category: Network
extra-source-files: CHANGELOG.md
common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl
ghc-options: -Wall

View file

@ -23,7 +23,6 @@ in
haddock
cabal-install
hlint
stylish-haskell
pkgs.python3Packages.asn1ate
];
};

View file

@ -1,9 +1,9 @@
import Data.Map.Internal.Debug (showTree)
import qualified Data.Map.Strict as Map
import Hash2Pub.FediChord
import Hash2Pub.FediChord
import Data.Map.Internal.Debug (showTree)
import qualified Data.Map.Strict as Map
giebMalCache :: [Integer] -> Map.Map NodeID ()
giebMalCache = Map.fromList . fmap (mkCacheEntry . fromInteger)
giebMalCache = Map.fromList . map (mkCacheEntry . fromInteger)
where
mkCacheEntry nodeid = (nodeid, ())
@ -18,11 +18,11 @@ nidLookupGT m = flip Map.lookupGT m . fromInteger
edgeCase1 :: IO ()
edgeCase1 = do
putStrLn "Let there be a Map with the keys [2^255+2^254+3, 2, 2^253], all keys are NodeIDs mod 2^256."
print testOverlap
print testOverlap
putStrLn "\nWhile (NodeID 2^255+2^254+3) > (NodeID 2^254 + 14) …"
print $ toNodeID (2^255+2^254+3) > toNodeID (2^254+14)
putStrLn "… and 2^255+2^254+3 is an element of the map…"
print $ Map.member (fromInteger (2^255+2^254+3)) testOverlap
print $ Map.member (fromInteger 2^255+2^254+3) testOverlap
putStrLn "… looking for an element larger than 2^254 + 14 doesn't yield any."
print $ nidLookupGT testOverlap (2^254+14)
putStrLn "\nThat's the tree of the map:"

View file

@ -2,25 +2,25 @@
module Hash2Pub.ASN1Coding where
import Control.Exception (displayException)
import Data.ASN1.BinaryEncoding
import Data.ASN1.Encoding
import Data.ASN1.Error ()
import Data.ASN1.Parse
import Data.ASN1.Types
import Data.Bifunctor (first)
import qualified Data.ByteString as BS
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe, isNothing, mapMaybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX ()
import Safe
import Data.ASN1.Encoding -- asn1-encoding package
import Data.ASN1.BinaryEncoding
import Data.ASN1.Error()
import Data.ASN1.Types -- asn1-types package
import Data.ASN1.Parse
import Data.Maybe (fromMaybe, mapMaybe, isNothing)
import Data.Time.Clock.POSIX()
import qualified Data.ByteString as BS
import qualified Data.Set as Set
import qualified Data.Map.Strict as Map
import Data.Bifunctor (first)
import Control.Exception (displayException)
import Safe
import Hash2Pub.DHTProtocol
import Hash2Pub.FediChord
import Hash2Pub.Utils
import Hash2Pub.FediChord
import Hash2Pub.Utils
import Hash2Pub.DHTProtocol
import Debug.Trace
import Debug.Trace
-- TODO: make this splitting function more intelligent, currently it creates many parts that are smaller than they could be, see #18
-- | Try splitting a payload into multiple parts to be able to reduce size of
@ -107,7 +107,7 @@ serialiseMessage maxBytesLength msg =
payloadParts i = zip [1..] . splitPayload i <$> actionPayload
actionPayload = payload msg
encodedMsgs i = Map.map encodeMsg $ messageParts i
maxMsgLength = maximum . fmap BS.length . Map.elems
maxMsgLength = maximum . map BS.length . Map.elems
-- | encode a 'FediChordMessage' to a bytestring without further modification
encodeMsg :: FediChordMessage -> BS.ByteString
@ -124,39 +124,39 @@ deserialiseMessage msgBytes = first displayException (decodeASN1' DER msgBytes)
-- indicated by the data constructor, as ASN.1
encodePayload :: ActionPayload -> [ASN1]
encodePayload LeaveResponsePayload = [Null]
encodePayload payload'@LeaveRequestPayload{} =
encodePayload payload'@LeaveRequestPayload{} =
Start Sequence
: Start Sequence
: fmap (IntVal . getNodeID) (leaveSuccessors payload')
<> [End Sequence
: map (IntVal . getNodeID) (leaveSuccessors payload')
++ [End Sequence
, Start Sequence]
<> fmap (IntVal . getNodeID) (leavePredecessors payload')
<> [End Sequence
++ map (IntVal . getNodeID) (leavePredecessors payload')
++ [End Sequence
, End Sequence]
-- currently StabiliseResponsePayload and LeaveRequestPayload are equal
encodePayload payload'@StabiliseResponsePayload{} =
encodePayload payload'@StabiliseResponsePayload{} =
Start Sequence
: Start Sequence
: fmap (IntVal . getNodeID) (stabiliseSuccessors payload')
<> [End Sequence
: map (IntVal . getNodeID) (stabiliseSuccessors payload')
++ [End Sequence
, Start Sequence]
<> fmap (IntVal . getNodeID) (stabilisePredecessors payload')
<> [End Sequence
++ map (IntVal . getNodeID) (stabilisePredecessors payload')
++ [End Sequence
, End Sequence]
encodePayload payload'@StabiliseRequestPayload = [Null]
encodePayload payload'@QueryIDResponsePayload{} =
encodePayload payload'@QueryIDResponsePayload{} =
let
resp = queryResult payload'
in
Start Sequence
: encodeQueryResult resp
: case resp of
FOUND ns -> encodeNodeState ns
FOUND ns -> encodeNodeState $ ns
FORWARD entrySet ->
Start Sequence
: (concatMap encodeCacheEntry . Set.elems $ entrySet)
<> [End Sequence]
<> [End Sequence]
++ [End Sequence]
++ [End Sequence]
encodePayload payload'@QueryIDRequestPayload{} = [
Start Sequence
, IntVal . getNodeID $ queryTargetID payload'
@ -167,21 +167,21 @@ encodePayload payload'@QueryIDRequestPayload{} = [
encodePayload payload'@JoinResponsePayload{} =
Start Sequence
: Start Sequence
: fmap (IntVal . getNodeID) (joinSuccessors payload')
<> [End Sequence
: map (IntVal . getNodeID) (joinSuccessors payload')
++ [End Sequence
, Start Sequence]
<> fmap (IntVal . getNodeID) (joinPredecessors payload')
<> [End Sequence
++ map (IntVal . getNodeID) (joinPredecessors payload')
++ [End Sequence
, Start Sequence]
<> concatMap encodeCacheEntry (joinCache payload')
<> [End Sequence
++ concatMap encodeCacheEntry (joinCache payload')
++ [End Sequence
, End Sequence]
encodePayload payload'@JoinRequestPayload{} = [Null]
encodePayload PingRequestPayload{} = [Null]
encodePayload payload'@PingResponsePayload{} =
Start Sequence
: concatMap encodeNodeState (pingNodeStates payload')
<> [End Sequence]
++ [End Sequence]
encodeNodeState :: NodeState -> [ASN1]
encodeNodeState ns = [
@ -200,13 +200,13 @@ encodeCacheEntry (RemoteCacheEntry ns timestamp) =
Start Sequence
: encodeNodeState ns
-- ToDo: possibly optimise this by using dlists
<> [
++ [
IntVal . fromIntegral . fromEnum $ timestamp
, End Sequence]
encodeCacheEntry _ = []
encodeQueryResult :: QueryResponse -> ASN1
encodeQueryResult FOUND{} = Enumerated 0
encodeQueryResult FOUND{} = Enumerated 0
encodeQueryResult FORWARD{} = Enumerated 1
-- | Encode a 'FediChordMessage' as ASN.1.
@ -218,11 +218,11 @@ encodeMessage
: (Enumerated . fromIntegral . fromEnum $ action)
: IntVal requestID
: encodeNodeState sender
<> [
++ [
IntVal parts
, IntVal part ]
<> maybe [] encodePayload requestPayload
<> [End Sequence]
++ maybe [] encodePayload requestPayload
++ [End Sequence]
encodeMessage
(Response responseTo senderID parts part action responsePayload) = [
Start Sequence
@ -231,8 +231,8 @@ encodeMessage
, IntVal parts
, IntVal part
, Enumerated . fromIntegral . fromEnum $ action]
<> maybe [] encodePayload responsePayload
<> [End Sequence]
++ maybe [] encodePayload responsePayload
++ [End Sequence]
-- ===== parser combinators =====
@ -240,21 +240,21 @@ parseMessage :: ParseASN1 FediChordMessage
parseMessage = do
begin <- getNext
case begin of
Start Sequence -> pure ()
x -> throwParseError $ "unexpected ASN.1 element " <> show x
Start Sequence -> return ()
x -> throwParseError $ "unexpected ASN.1 element " ++ show x
-- request and response messages are distiguishable by their structure,
-- see ASN.1 schema
firstElem <- getNext
message <- case firstElem of
Enumerated a -> parseRequest . toEnum . fromIntegral $ a
IntVal i -> parseResponse i
other -> throwParseError $ "unexpected first ASN1 element: " <> show other
other -> throwParseError $ "unexpected first ASN1 element: " ++ show other
-- consume sequence end
end <- getNext
case end of
End Sequence -> pure ()
x -> throwParseError $ "unexpected ASN.1 element " <> show x
pure message
End Sequence -> return ()
x -> throwParseError $ "unexpected ASN.1 element " ++ show x
return message
@ -265,14 +265,14 @@ parseRequest action = do
parts <- parseInteger
part <- parseInteger
hasPayload <- hasNext
payload <- if not hasPayload then pure Nothing else Just <$> case action of
QueryID -> parseQueryIDRequest
Join -> parseJoinRequest
Leave -> parseLeaveRequest
payload <- if not hasPayload then return Nothing else Just <$> case action of
QueryID -> parseQueryIDRequest
Join -> parseJoinRequest
Leave -> parseLeaveRequest
Stabilise -> parseStabiliseRequest
Ping -> parsePingRequest
Ping -> parsePingRequest
pure $ Request requestID sender parts part action payload
return $ Request requestID sender parts part action payload
parseResponse :: Integer -> ParseASN1 FediChordMessage
parseResponse responseTo = do
@ -281,49 +281,49 @@ parseResponse responseTo = do
part <- parseInteger
action <- parseEnum :: ParseASN1 Action
hasPayload <- hasNext
payload <- if not hasPayload then pure Nothing else Just <$> case action of
QueryID -> parseQueryIDResponse
Join -> parseJoinResponse
Leave -> parseLeaveResponse
payload <- if not hasPayload then return Nothing else Just <$> case action of
QueryID -> parseQueryIDResponse
Join -> parseJoinResponse
Leave -> parseLeaveResponse
Stabilise -> parseStabiliseResponse
Ping -> parsePingResponse
Ping -> parsePingResponse
pure $ Response responseTo senderID parts part action payload
return $ Response responseTo senderID parts part action payload
parseInteger :: ParseASN1 Integer
parseInteger = do
i <- getNext
case i of
IntVal parsed -> pure parsed
x -> throwParseError $ "Expected IntVal but got " <> show x
IntVal parsed -> return parsed
x -> throwParseError $ "Expected IntVal but got " ++ show x
parseEnum :: Enum a => ParseASN1 a
parseEnum = do
e <- getNext
case e of
Enumerated en -> pure $ toEnum . fromIntegral $ en
x -> throwParseError $ "Expected Enumerated but got " <> show x
Enumerated en -> return $ toEnum . fromIntegral $ en
x -> throwParseError $ "Expected Enumerated but got " ++ show x
parseString :: ParseASN1 String
parseString = do
s <- getNext
case s of
ASN1String toBeParsed -> maybe (throwParseError "string parsing failed") pure $ asn1CharacterToString toBeParsed
x -> throwParseError $ "Expected a ASN1String but got " <> show x
ASN1String toBeParsed -> maybe (throwParseError "string parsing failed") return $ asn1CharacterToString toBeParsed
x -> throwParseError $ "Expected a ASN1String but got " ++ show x
parseOctets :: ParseASN1 BS.ByteString
parseOctets = do
os <- getNext
case os of
OctetString bs -> pure bs
x -> throwParseError $ "Expected an OctetString but got " <> show x
OctetString bs -> return bs
x -> throwParseError $ "Expected an OctetString but got " ++ show x
parseNull :: ParseASN1 ()
parseNull = do
n <- getNext
case n of
Null -> pure ()
x -> throwParseError $ "Expected Null but got " <> show x
Null -> return ()
x -> throwParseError $ "Expected Null but got " ++ show x
parseNodeState :: ParseASN1 NodeState
parseNodeState = onNextContainer Sequence $ do
@ -333,7 +333,7 @@ parseNodeState = onNextContainer Sequence $ do
dhtPort' <- fromInteger <$> parseInteger
apPort' <- fromInteger <$> parseInteger
vServer' <- parseInteger
pure NodeState {
return NodeState {
nid = nid'
, domain = domain'
, dhtPort = dhtPort'
@ -348,7 +348,7 @@ parseCacheEntry :: ParseASN1 RemoteCacheEntry
parseCacheEntry = onNextContainer Sequence $ do
node <- parseNodeState
timestamp <- toEnum . fromIntegral <$> parseInteger
pure $ RemoteCacheEntry node timestamp
return $ RemoteCacheEntry node timestamp
parseNodeCache :: ParseASN1 [RemoteCacheEntry]
parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry
@ -356,14 +356,14 @@ parseNodeCache = onNextContainer Sequence $ getMany parseCacheEntry
parseJoinRequest :: ParseASN1 ActionPayload
parseJoinRequest = do
parseNull
pure JoinRequestPayload
return JoinRequestPayload
parseJoinResponse :: ParseASN1 ActionPayload
parseJoinResponse = onNextContainer Sequence $ do
succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
succ' <- map fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pred' <- map fromInteger <$> onNextContainer Sequence (getMany parseInteger)
cache <- parseNodeCache
pure $ JoinResponsePayload {
return $ JoinResponsePayload {
joinSuccessors = succ'
, joinPredecessors = pred'
, joinCache = cache
@ -373,7 +373,7 @@ parseQueryIDRequest :: ParseASN1 ActionPayload
parseQueryIDRequest = onNextContainer Sequence $ do
targetID <- fromInteger <$> parseInteger
lBestNodes <- parseInteger
pure $ QueryIDRequestPayload {
return $ QueryIDRequestPayload {
queryTargetID = targetID
, queryLBestNodes = lBestNodes
}
@ -385,29 +385,29 @@ parseQueryIDResponse = onNextContainer Sequence $ do
0 -> FOUND <$> parseNodeState
1 -> FORWARD . Set.fromList <$> parseNodeCache
_ -> throwParseError "invalid QueryIDResponse type"
pure $ QueryIDResponsePayload {
return $ QueryIDResponsePayload {
queryResult = result
}
parseStabiliseRequest :: ParseASN1 ActionPayload
parseStabiliseRequest = do
parseNull
pure StabiliseRequestPayload
return StabiliseRequestPayload
parseStabiliseResponse :: ParseASN1 ActionPayload
parseStabiliseResponse = onNextContainer Sequence $ do
succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pure $ StabiliseResponsePayload {
succ' <- map fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pred' <- map fromInteger <$> onNextContainer Sequence (getMany parseInteger)
return $ StabiliseResponsePayload {
stabiliseSuccessors = succ'
, stabilisePredecessors = pred'
}
parseLeaveRequest :: ParseASN1 ActionPayload
parseLeaveRequest = onNextContainer Sequence $ do
succ' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pred' <- fmap fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pure $ LeaveRequestPayload {
succ' <- map fromInteger <$> onNextContainer Sequence (getMany parseInteger)
pred' <- map fromInteger <$> onNextContainer Sequence (getMany parseInteger)
return $ LeaveRequestPayload {
leaveSuccessors = succ'
, leavePredecessors = pred'
}
@ -415,16 +415,16 @@ parseLeaveRequest = onNextContainer Sequence $ do
parseLeaveResponse :: ParseASN1 ActionPayload
parseLeaveResponse = do
parseNull
pure LeaveResponsePayload
return LeaveResponsePayload
parsePingRequest :: ParseASN1 ActionPayload
parsePingRequest = do
parseNull
pure PingRequestPayload
return PingRequestPayload
parsePingResponse :: ParseASN1 ActionPayload
parsePingResponse = onNextContainer Sequence $ do
handledNodes <- getMany parseNodeState
pure $ PingResponsePayload {
return $ PingResponsePayload {
pingNodeStates = handledNodes
}

View file

@ -1,3 +1,5 @@
{-# LANGUAGE OverloadedStrings #-}
module Hash2Pub.DHTProtocol
( QueryResponse (..)
, queryLocalCache
@ -15,26 +17,36 @@ module Hash2Pub.DHTProtocol
)
where
import qualified Data.Map as Map
import Data.Maybe (fromMaybe, maybe)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
import Network.Socket hiding (recv, recvFrom, send, sendTo)
import Network.Socket.ByteString
import Data.Maybe (maybe, fromMaybe)
import qualified Data.Set as Set
import qualified Data.Map as Map
import Data.Time.Clock.POSIX
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import System.Timeout
import Control.Monad.State.Strict
import Hash2Pub.FediChord (CacheEntry (..), NodeCache, NodeID,
NodeState (..),
cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred,
cacheLookupSucc, getPredecessors,
getSuccessors, localCompare,
putPredecessors, putSuccessors)
import Hash2Pub.FediChord
( NodeID
, NodeState (..)
, getSuccessors
, putSuccessors
, getPredecessors
, putPredecessors
, cacheGetNodeStateUnvalidated
, NodeCache
, CacheEntry(..)
, cacheLookup
, cacheLookupSucc
, cacheLookupPred
, localCompare
)
import Debug.Trace (trace)
import Debug.Trace (trace)
-- === queries ===
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^ return closest nodes from local cache.
data QueryResponse = FORWARD (Set.Set RemoteCacheEntry) -- ^return closest nodes from local cache.
-- whole cache entry is returned for making
-- the entry time stamp available to the
-- protocol serialiser
@ -73,7 +85,7 @@ queryLocalCache ownState nCache lBestNodes targetID
-- === protocol serialisation data types
data Action =
QueryID
QueryID
| Join
| Leave
| Stabilise
@ -82,32 +94,32 @@ data Action =
data FediChordMessage =
Request {
requestID :: Integer
, sender :: NodeState
, parts :: Integer
, part :: Integer
requestID :: Integer
, sender :: NodeState
, parts :: Integer
, part :: Integer
-- ^ part starts at 0
, action :: Action
, payload :: Maybe ActionPayload
, action :: Action
, payload :: Maybe ActionPayload
}
| Response {
responseTo :: Integer
, senderID :: NodeID
, parts :: Integer
, part :: Integer
, action :: Action
, payload :: Maybe ActionPayload
responseTo :: Integer
, senderID :: NodeID
, parts :: Integer
, part :: Integer
, action :: Action
, payload :: Maybe ActionPayload
} deriving (Show, Eq)
data ActionPayload =
data ActionPayload =
QueryIDRequestPayload {
queryTargetID :: NodeID
, queryLBestNodes :: Integer
}
| JoinRequestPayload
| LeaveRequestPayload {
leaveSuccessors :: [NodeID]
, leavePredecessors :: [NodeID]
leaveSuccessors :: [NodeID]
, leavePredecessors :: [NodeID]
}
| StabiliseRequestPayload
| PingRequestPayload
@ -115,14 +127,14 @@ data ActionPayload =
queryResult :: QueryResponse
}
| JoinResponsePayload {
joinSuccessors :: [NodeID]
, joinPredecessors :: [NodeID]
, joinCache :: [RemoteCacheEntry]
joinSuccessors :: [NodeID]
, joinPredecessors :: [NodeID]
, joinCache :: [RemoteCacheEntry]
}
| LeaveResponsePayload
| StabiliseResponsePayload {
stabiliseSuccessors :: [NodeID]
, stabilisePredecessors :: [NodeID]
stabiliseSuccessors :: [NodeID]
, stabilisePredecessors :: [NodeID]
}
| PingResponsePayload {
pingNodeStates :: [NodeState]
@ -160,7 +172,7 @@ addCacheEntry :: RemoteCacheEntry -- ^ a remote cache entry received from netw
-> IO NodeCache -- ^ new node cache with the element inserted
addCacheEntry entry cache = do
now <- getPOSIXTime
pure $ addCacheEntryPure now entry cache
return $ addCacheEntryPure now entry cache
-- | pure version of 'addCacheEntry' with current time explicitly specified as argument
addCacheEntryPure :: POSIXTime -- ^ current time
@ -186,7 +198,7 @@ deleteCacheEntry :: NodeID -- ^ID of the node to be deleted
deleteCacheEntry = Map.update modifier
where
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
modifier NodeEntry {} = Nothing
modifier NodeEntry {} = Nothing
-- | Mark a cache entry as verified after pinging it, possibly bumping its timestamp.
markCacheEntryAsVerified :: Maybe POSIXTime -- ^ the (current) timestamp to be
@ -200,13 +212,59 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry
adjustFunc entry = entry
-- ====== message send and receive operations ======
requestQueryID :: NodeState -> NodeID -> IO NodeState
-- 1. do a local lookup for the l closest nodes
-- 2. create l sockets
-- 3. send a message async concurrently to all l nodes
-- 4. collect the results, insert them into cache
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
requestQueryID ns targetID = do
cacheSnapshot <- readIORef $ getNodeCacheRef ns
let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node
case localResult of
FOUND thisNode -> return thisNode
FORWARD nodeSet ->
sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
responses = mapM
sendRequestTo :: Int -- ^ timeout in seconds
-> Int -- ^ number of retries
-> FediChordMessage -- ^ the message to be sent
-> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo timeout attempts msg sock = do
let requests = serialiseMessage 1200 msg
-- ToDo: make attempts and timeout configurable
attempts 3 . timeout 5000 $ do
where
-- state reingeben: state = noch nicht geackte messages, result = responses
sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage)
sendAndAck sock = do
remainingSends <- get
sendMany sock $ Map.elems remainingSends
-- timeout pro receive socket, danach catMaybes
-- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen
replicateM
-- idea: send all parts at once
-- Set/ Map with unacked parts
-- then recv with timeout for |unackedParts| attempts, receive acked parts from set/ map
-- how to manage individual retries? nested "attempts"
-- | retry an IO action at most *i* times until it delivers a result
attempts :: Int -- ^ number of retries *i*
-> IO (Maybe a) -- ^ action to retry
-> IO (Maybe a) -- ^ result after at most *i* retries
attempts 0 _ = pure Nothing
attempts 0 _ = return Nothing
attempts i action = do
actionResult <- action
case actionResult of
Nothing -> attempts (i-1) action
Just res -> pure $ Just res
Nothing -> attempts (i-1) action
Just res -> return $ Just res

View file

@ -1,6 +1,4 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE GeneralizedNewtypeDeriving, DataKinds, OverloadedStrings #-}
{- |
Module : FediChord
Description : An opinionated implementation of the EpiChord DHT by Leong et al.
@ -47,30 +45,29 @@ module Hash2Pub.FediChord (
, cacheWriter
) where
import Control.Exception
import qualified Data.Map.Strict as Map
import Data.Maybe (fromMaybe, isJust, mapMaybe)
import Data.Time.Clock.POSIX
import Network.Socket
import qualified Data.Map.Strict as Map
import Network.Socket
import Data.Time.Clock.POSIX
import Control.Exception
import Data.Maybe (isJust, fromMaybe, mapMaybe)
-- for hashing and ID conversion
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Monad (forever)
import Crypto.Hash
import qualified Data.ByteArray as BA
import qualified Data.ByteString as BS
import qualified Data.ByteString.UTF8 as BSU
import Data.IORef
import Data.IP (IPv6, fromHostAddress6,
toHostAddress6)
import Data.Typeable (Typeable (..), typeOf)
import Data.Word
import qualified Network.ByteOrder as NetworkBytes
import Crypto.Hash
import Data.Word
import qualified Data.ByteString as BS
import qualified Data.ByteString.UTF8 as BSU
import qualified Data.ByteArray as BA
import qualified Network.ByteOrder as NetworkBytes
import Data.IP (IPv6, fromHostAddress6, toHostAddress6)
import Data.IORef
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Monad (forever)
import Data.Typeable (Typeable(..), typeOf)
import Hash2Pub.Utils
import Hash2Pub.Utils
import Debug.Trace (trace)
import Debug.Trace (trace)
-- define protocol constants
-- | static definition of ID length in bits
@ -123,14 +120,14 @@ a `localCompare` b
-- | represents a node and all its important state
data NodeState = NodeState {
nid :: NodeID
, domain :: String
nid :: NodeID
, domain :: String
-- ^ full public domain name the node is reachable under
, ipAddr :: HostAddress6
, ipAddr :: HostAddress6
-- the node's public IPv6 address
, dhtPort :: PortNumber
, dhtPort :: PortNumber
-- ^ port of the DHT itself
, apPort :: Maybe PortNumber
, apPort :: Maybe PortNumber
-- ^ port of the ActivityPub relay and storage service
-- might have to be queried first
, vServerID :: Integer
@ -145,32 +142,32 @@ data NodeState = NodeState {
-- | encapsulates all data and parameters that are not present for remote nodes
data InternalNodeState = InternalNodeState {
nodeCache :: IORef NodeCache
nodeCache :: IORef NodeCache
-- ^ EpiChord node cache with expiry times for nodes
-- as the map is ordered, lookups for the closes preceding node can be done using @lookupLT@.
-- encapsulated into an IORef for allowing concurrent reads without locking
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
, cacheWriteQueue :: TQueue (NodeCache -> NodeCache)
-- ^ cache updates are not written directly to the 'nodeCache' but queued and
-- only processed by a single writer thread to prevent lost updates.
-- All nodeCache modifying functions have to be partially applied enough before
-- being put into the queue.
--
, successors :: [NodeID] -- could be a set instead as these are ordered as well
, successors :: [NodeID] -- could be a set instead as these are ordered as well
-- ^ successor nodes in ascending order by distance
, predecessors :: [NodeID]
, predecessors :: [NodeID]
-- ^ predecessor nodes in ascending order by distance
----- protocol parameters -----
-- TODO: evaluate moving these somewhere else
, kNeighbours :: Int
, kNeighbours :: Int
-- ^ desired length of predecessor and successor list
-- needs to be parameterisable for simulation purposes
, lNumBestNodes :: Int
, lNumBestNodes :: Int
-- ^ number of best next hops to provide
-- needs to be parameterisable for simulation purposes
, pNumParallelQueries :: Int
-- ^ number of parallel sent queries
-- needs to be parameterisable for simulation purposes
, jEntriesPerSlice :: Int
, jEntriesPerSlice :: Int
-- ^ number of desired entries per cache slice
-- needs to be parameterisable for simulation purposes
} deriving (Show, Eq)
@ -251,10 +248,10 @@ data ProxyDirection = Backwards | Forwards deriving (Show, Eq)
instance Enum ProxyDirection where
toEnum (-1) = Backwards
toEnum 1 = Forwards
toEnum _ = error "no such ProxyDirection"
toEnum 1 = Forwards
toEnum _ = error "no such ProxyDirection"
fromEnum Backwards = - 1
fromEnum Forwards = 1
fromEnum Forwards = 1
--- useful function for getting entries for a full cache transfer
cacheEntries :: NodeCache -> [CacheEntry]
@ -275,7 +272,7 @@ cacheLookup :: NodeID -- ^lookup key
-> Maybe CacheEntry
cacheLookup key cache = case Map.lookup key cache of
Just (ProxyEntry _ result) -> result
res -> res
res -> res
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
-- to simulate a modular ring
@ -283,12 +280,12 @@ lookupWrapper :: (NodeID -> NodeCache -> Maybe (NodeID, CacheEntry)) -> (NodeID
lookupWrapper f fRepeat direction key cache =
case f key cache of
-- the proxy entry found holds a
Just (_, ProxyEntry _ (Just entry@NodeEntry{})) -> Just entry
Just (_, (ProxyEntry _ (Just entry@NodeEntry{}))) -> Just entry
-- proxy entry holds another proxy entry, this should not happen
Just (_, ProxyEntry _ (Just (ProxyEntry _ _))) -> Nothing
Just (_, (ProxyEntry _ (Just (ProxyEntry _ _)))) -> Nothing
-- proxy entry without own entry is a pointer on where to continue
-- if lookup direction is the same as pointer direction: follow pointer
Just (foundKey, ProxyEntry (pointerID, pointerDirection) Nothing) ->
Just (foundKey, (ProxyEntry (pointerID, pointerDirection) Nothing)) ->
let newKey = if pointerDirection == direction
then pointerID
else foundKey + (fromInteger . toInteger . fromEnum $ direction)
@ -325,17 +322,17 @@ cacheLookupPred = lookupWrapper Map.lookupLT Map.lookupLE Backwards
cacheGetNodeStateUnvalidated :: CacheEntry -> NodeState
cacheGetNodeStateUnvalidated (NodeEntry _ nState _) = nState
cacheGetNodeStateUnvalidated (ProxyEntry _ (Just entry)) = cacheGetNodeStateUnvalidated entry
cacheGetNodeStateUnvalidated _ = error "trying to pure empty node state, please report a bug"
cacheGetNodeStateUnvalidated _ = error "trying to return empty node state, please report a bug"
-- | converts a 'HostAddress6' IP address to a big-endian strict ByteString
ipAddrAsBS :: HostAddress6 -> BS.ByteString
ipAddrAsBS (a, b, c, d) = mconcat $ fmap NetworkBytes.bytestring32 [a, b, c, d]
ipAddrAsBS (a, b, c, d) = mconcat $ map NetworkBytes.bytestring32 [a, b, c, d]
-- | converts a ByteString in big endian order to an IPv6 address 'HostAddress6'
bsAsIpAddr :: BS.ByteString -> HostAddress6
bsAsIpAddr bytes = (a,b,c,d)
where
a:b:c:d:_ = fmap NetworkBytes.word32 . chunkBytes 4 $ bytes
a:b:c:d:_ = map NetworkBytes.word32 . chunkBytes 4 $ bytes
-- | generates a 256 bit long NodeID using SHAKE128, represented as ByteString
@ -347,7 +344,7 @@ genNodeIDBS ip nodeDomain vserver =
hashIpaddrUpper `BS.append` hashID nodeDomain' `BS.append` hashIpaddLower
where
vsBS = BS.pack [vserver] -- attention: only works for vserver IDs up to 255
ipaddrNet = BS.take 8 (ipAddrAsBS ip) `BS.append` vsBS
ipaddrNet = (BS.take 8 $ ipAddrAsBS ip) `BS.append` vsBS
nodeDomain' = BSU.fromString nodeDomain `BS.append` vsBS
hashID bstr = BS.pack . BA.unpack $ (hash bstr :: Digest (SHAKE128 128))
(hashIpaddrUpper, hashIpaddLower) = BS.splitAt 64 $ hashID ipaddrNet
@ -382,7 +379,7 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
Just (bs', w) -> parseWithOffset offset w : parsedBytes (offset+1) bs'
parseWithOffset :: Integer -> Word8 -> Integer
parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0
parseWithOffset 0 word = toInteger word -- a shift of 0 is always 0
parseWithOffset offset word = toInteger word * 2^(8 * offset)
@ -394,7 +391,7 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
--checkCacheSlices :: NodeState -> IO [()]
--checkCacheSlices state = case getNodeCache state of
-- -- don't do anything on nodes without a cache
-- Nothing -> pure [()]
-- Nothing -> return [()]
-- Just cache' -> checkSlice jEntries (nid state) startBound lastSucc =<< readIORef cache'
-- -- TODO: do the same for predecessors
-- where
@ -405,7 +402,7 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
-- checkSlice _ _ _ Nothing _ = []
-- checkSlice j ownID upperBound (Just lastSuccNode) cache
-- | upperBound < lastSuccNode = []
-- | otherwise =
-- | otherwise =
-- -- continuously half the DHT namespace, take the upper part as a slice,
-- -- check for existing entries in that slice and create a lookup action
-- -- and recursively do this on the lower half.
@ -418,10 +415,10 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
-- -- TODO: replace empty IO actions with actual lookups to middle of slice
-- -- TODO: validate ID before adding to cache
-- case Map.lookupLT upperBound cache of
-- Nothing -> pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache
-- Nothing -> return () : checkSlice j ownID lowerBound (Just lastSuccNode) cache
-- Just (matchID, _) ->
-- if
-- matchID <= lowerBound then pure () : checkSlice j ownID lowerBound (Just lastSuccNode) cache
-- matchID <= lowerBound then return () : checkSlice j ownID lowerBound (Just lastSuccNode) cache
-- else
-- checkSlice j ownID lowerBound (Just lastSuccNode) cache
@ -431,8 +428,8 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
-- | configuration values used for initialising the FediChord DHT
data FediChordConf = FediChordConf {
confDomain :: String
, confIP :: HostAddress6
confDomain :: String
, confIP :: HostAddress6
, confDhtPort :: Int
} deriving (Show, Eq)
@ -442,7 +439,7 @@ fediChordInit :: FediChordConf -> IO (Socket, NodeState)
fediChordInit conf = do
initialState <- nodeStateInit conf
serverSock <- mkServerSocket (ipAddr initialState) (dhtPort initialState)
pure (serverSock, initialState)
return (serverSock, initialState)
-- | initialises the 'NodeState' for this local node.
-- Separated from 'fediChordInit' to be usable in tests.
@ -470,7 +467,7 @@ nodeStateInit conf = do
, pNumParallelQueries = 2
, jEntriesPerSlice = 2
}
pure initialState
return initialState
--fediChordJoin :: NodeState -- ^ the local 'NodeState'
-- -> (String, PortNumber) -- ^ domain and port of a bootstrapping node
@ -489,13 +486,13 @@ cacheWriter :: NodeState -> IO ()
cacheWriter ns = do
let writeQueue' = getCacheWriteQueue ns
case writeQueue' of
Nothing -> pure ()
Nothing -> return ()
Just writeQueue -> forever $ do
f <- atomically $ readTQueue writeQueue
let
refModifier :: NodeCache -> (NodeCache, ())
refModifier nc = (f nc, ())
maybe (pure ()) (
maybe (return ()) (
\ref -> atomicModifyIORef' ref refModifier
) $ getNodeCacheRef ns
@ -521,7 +518,7 @@ mkServerSocket ip port = do
sock <- socket AF_INET6 Datagram defaultProtocol
setSocketOption sock IPv6Only 1
bind sock sockAddr
pure sock
return sock
-- | create a UDP datagram socket, connected to a destination.
-- The socket gets an arbitrary free local port assigned.
@ -532,4 +529,4 @@ mkSendSocket dest destPort = do
destAddr <- addrAddress <$> resolve (Just dest) (Just destPort)
sendSock <- socket AF_INET6 Datagram defaultProtocol
setSocketOption sendSock IPv6Only 1
pure sendSock
return sendSock

View file

@ -1,10 +1,10 @@
module Main where
import Control.Concurrent
import Data.IP (IPv6, toHostAddress6)
import System.Environment
import System.Environment
import Data.IP (IPv6, toHostAddress6) -- iproute, just for IPv6 string parsing
import Control.Concurrent
import Hash2Pub.FediChord
import Hash2Pub.FediChord
main :: IO ()
main = do
@ -20,12 +20,12 @@ main = do
-- idea: list of bootstrapping nodes, try joining within a timeout
-- stop main thread from terminating during development
getChar
pure ()
return ()
readConfig :: IO FediChordConf
readConfig = do
confDomainString : ipString : portString : _ <- getArgs
pure $ FediChordConf {
return $ FediChordConf {
confDomain = confDomainString
, confIP = toHostAddress6 . read $ ipString
, confDhtPort = read portString

View file

@ -2,11 +2,11 @@
module Hash2Pub.Utils where
import qualified Data.ByteString as BS
import qualified Data.Set as Set
import qualified Data.Set as Set
-- |wraps a list into a Maybe, by replacing empty lists with Nothing
maybeEmpty :: [a] -> Maybe [a]
maybeEmpty [] = Nothing
maybeEmpty [] = Nothing
maybeEmpty nonemptyList = Just nonemptyList
-- | Chop a list into sublists of i elements. The last sublist might contain
@ -15,7 +15,7 @@ chunksOf :: Int -> [a] -> [[a]]
chunksOf i xs =
case splitAt i xs of
(a, []) -> [a]
(a, b) -> a : chunksOf i b
(a, b) -> a : chunksOf i b
-- | Chop a 'BS.ByteString' into list of substrings of i elements. The last
@ -24,7 +24,7 @@ chunkBytes :: Int -> BS.ByteString -> [BS.ByteString]
chunkBytes i xs =
case BS.splitAt i xs of
(a, "") -> [a]
(a, b) -> a : chunkBytes i b
(a, b) -> a : chunkBytes i b
-- | Chop a 'Set.Set' into a list of disjuct subsets of i elements. The last
-- subset might contain less than i elements.

View file

@ -1,20 +1,20 @@
{-# LANGUAGE OverloadedStrings #-}
module Main where
import qualified Data.ASN1.Encoding as ASN1 -- asn1-encoding package
import qualified Data.ASN1.BinaryEncoding as ASN1
import qualified Data.ASN1.Encoding as ASN1
import qualified Data.ASN1.Error as ASN1
import qualified Data.ASN1.Parse as ASN1P
import qualified Data.ASN1.Types as ASN1
import qualified Data.ByteString as BS
import Data.Maybe (fromMaybe)
import Debug.Trace (trace)
import qualified Data.ASN1.Error as ASN1
import qualified Data.ASN1.Types as ASN1 -- asn1-types package
import qualified Data.ASN1.Parse as ASN1P
import qualified Data.ByteString as BS
import Data.Maybe (fromMaybe)
import Debug.Trace (trace)
-- import Hash2Pub.Fedichord
-- encoding values as ASN.1 types is done using Data.ASN1.Prim
someASN1 :: [ASN1.ASN1]
someASN1 = ASN1.Start ASN1.Sequence : ASN1.ASN1String (ASN1.asn1CharacterString ASN1.Visible domain) : ASN1.ASN1String (ASN1.asn1CharacterString ASN1.UTF8 unicode) : fmap ASN1.IntVal xs <> [ASN1.End ASN1.Sequence]
someASN1 = ASN1.Start ASN1.Sequence : ASN1.ASN1String (ASN1.asn1CharacterString ASN1.Visible domain) : ASN1.ASN1String (ASN1.asn1CharacterString ASN1.UTF8 unicode) : map ASN1.IntVal xs ++ [ASN1.End ASN1.Sequence]
where
domain = "domains.are.ascii.on.ly"
unicode = "Hähä, but unicode string!"
@ -27,12 +27,12 @@ derToAsn1 :: BS.ByteString -> Either ASN1.ASN1Error [ASN1.ASN1]
derToAsn1 = ASN1.decodeASN1' ASN1.DER
getUnicodeField :: [ASN1.ASN1] -> String
getUnicodeField (ASN1.Start ASN1.Sequence : _ : ASN1.ASN1String strASN1 : _) = fromMaybe "" $ ASN1.asn1CharacterToString strASN1
getUnicodeField ((ASN1.Start ASN1.Sequence) : _ : (ASN1.ASN1String strASN1) : _) = fromMaybe "" $ ASN1.asn1CharacterToString strASN1
testParser :: ASN1P.ParseASN1 String
testParser = do
foo <- ASN1P.onNextContainer ASN1.Sequence getAll
pure $ show foo
return $ show foo
getAll :: ASN1P.ParseASN1 [ASN1.ASN1]
getAll = ASN1P.getMany ASN1P.getNext

View file

@ -1,32 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
function ls-source-files {
git ls-files "app/*.hs" "src/*.hs" "test/*.hs"
}
function check-git-status {
[ "$(git status -s '*.hs' | wc -l)" == "0" ]
}
function stylish {
stylish-haskell -i $(ls-source-files)
}
if check-git-status
then
echo "Running stylish-haskell..."
stylish
echo "Done."
if check-git-status
then
echo "OK, impeccable style."
else
echo "KO! Lack of style on those files:"
git status -sb
exit 1
fi
else
echo "git status not clean, aborting"
fi

View file

@ -1,20 +1,20 @@
{-# LANGUAGE OverloadedStrings #-}
module FediChordSpec where
import Control.Exception
import Data.ASN1.Parse (runParseASN1)
import qualified Data.ByteString as BS
import Data.IORef
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust)
import qualified Data.Set as Set
import Data.Time.Clock.POSIX
import Network.Socket
import Test.Hspec
import Test.Hspec
import Control.Exception
import Network.Socket
import Data.Maybe (fromJust)
import qualified Data.Map.Strict as Map
import qualified Data.ByteString as BS
import qualified Data.Set as Set
import Data.ASN1.Parse (runParseASN1)
import Data.Time.Clock.POSIX
import Data.IORef
import Hash2Pub.ASN1Coding
import Hash2Pub.DHTProtocol
import Hash2Pub.FediChord
import Hash2Pub.FediChord
import Hash2Pub.DHTProtocol
import Hash2Pub.ASN1Coding
spec :: Spec
spec = do
@ -96,19 +96,19 @@ spec = do
-- ignore empty proxy elements in initial cache
nid . cacheGetNodeStateUnvalidated <$> cacheLookupPred (exampleID + 10) emptyCache `shouldBe` Nothing
nid . cacheGetNodeStateUnvalidated <$> cacheLookupSucc exampleID emptyCache `shouldBe` Nothing
-- given situation: 0 < anotherNode < nid exampleLocalNode < maxBound
-- first try non-modular queries between the 2 stored nodes
nid . cacheGetNodeStateUnvalidated <$> cacheLookupPred (exampleID + 10) newCache `shouldBe` Just exampleID
nid . cacheGetNodeStateUnvalidated <$> cacheLookupSucc exampleID newCache `shouldBe` Just exampleID
nid . cacheGetNodeStateUnvalidated <$> cacheLookupPred (exampleID + 10) newCache `shouldBe` Just exampleID
nid . cacheGetNodeStateUnvalidated <$> cacheLookupSucc exampleID newCache `shouldBe` Just exampleID
nid . cacheGetNodeStateUnvalidated <$> cacheLookupSucc (exampleID + 10) newCache `shouldBe` Just anotherID
-- queries that require a (pseudo)modular structure
nid . cacheGetNodeStateUnvalidated <$> cacheLookupPred (exampleID - 2) newCache `shouldBe` Just anotherID
nid . cacheGetNodeStateUnvalidated <$> cacheLookupSucc (anotherID + 2) newCache `shouldBe` Just exampleID
nid . cacheGetNodeStateUnvalidated <$> cacheLookupSucc (anotherID + 2) newCache `shouldBe` Just exampleID
-- now store a node in one of the ProxyEntries
let cacheWithProxyNodeEntry = addCacheEntryPure 10 (RemoteCacheEntry maxNode 10) newCache
nid . cacheGetNodeStateUnvalidated <$> cacheLookupPred (exampleID - 2) cacheWithProxyNodeEntry `shouldBe` Just maxBound
nid . cacheGetNodeStateUnvalidated <$> cacheLookupSucc (anotherID + 2) cacheWithProxyNodeEntry `shouldBe` Just maxBound
nid . cacheGetNodeStateUnvalidated <$> cacheLookupSucc (anotherID + 2) cacheWithProxyNodeEntry `shouldBe` Just maxBound
it "entries can be deleted" $ do
let
nC = addCacheEntryPure 10 (RemoteCacheEntry maxNode 10) newCache
@ -123,7 +123,7 @@ spec = do
nid1 = toNodeID 2^(23::Integer)+1
node1 = do
eln <- exampleLocalNode -- is at 2^23.00000017198264 = 8388609
pure $ putPredecessors [nid4] $ eln {nid = nid1}
return $ putPredecessors [nid4] $ eln {nid = nid1}
nid2 = toNodeID 2^(230::Integer)+12
node2 = exampleNodeState { nid = nid2}
nid3 = toNodeID 2^(25::Integer)+10
@ -156,7 +156,7 @@ spec = do
describe "Messages can be encoded to and decoded from ASN.1" $ do
-- define test messages
let
someNodeIDs = fmap fromInteger [3..12]
someNodeIDs = map fromInteger [3..12]
qidReqPayload = QueryIDRequestPayload {
queryTargetID = nid exampleNodeState
, queryLBestNodes = 3

View file

@ -1,8 +1,8 @@
module Main (main) where
import Test.Hspec
import qualified FediChordSpec
import Test.Hspec
main :: IO ()
main = hspec $
main = hspec $ do
describe "FediChord tests" FediChordSpec.spec