From b8be20b86e03dec07cd44ecd06ff980017ca401a Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 17 May 2020 01:24:56 +0200 Subject: [PATCH] begin implementation of message sending --- Hash2Pub/Hash2Pub.cabal | 2 +- Hash2Pub/src/Hash2Pub/DHTProtocol.hs | 48 ++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/Hash2Pub/Hash2Pub.cabal b/Hash2Pub/Hash2Pub.cabal index 084b096..e3aa4c1 100644 --- a/Hash2Pub/Hash2Pub.cabal +++ b/Hash2Pub/Hash2Pub.cabal @@ -46,7 +46,7 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl ghc-options: -Wall diff --git a/Hash2Pub/src/Hash2Pub/DHTProtocol.hs b/Hash2Pub/src/Hash2Pub/DHTProtocol.hs index a90c304..4688d39 100644 --- a/Hash2Pub/src/Hash2Pub/DHTProtocol.hs +++ b/Hash2Pub/src/Hash2Pub/DHTProtocol.hs @@ -23,6 +23,8 @@ import qualified Data.Map as Map import Data.Time.Clock.POSIX import Network.Socket hiding (send, sendTo, recv, recvFrom) import Network.Socket.ByteString +import System.Timeout +import Control.Monad.State.Strict import Hash2Pub.FediChord ( NodeID @@ -210,6 +212,52 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc adjustFunc (ProxyEntry _ (Just entry)) = adjustFunc entry adjustFunc entry = entry +-- ====== message send and receive operations ====== + +requestQueryID :: NodeState -> NodeID -> IO NodeState +-- 1. do a local lookup for the l closest nodes +-- 2. create l sockets +-- 3. send a message async concurrently to all l nodes +-- 4. collect the results, insert them into cache +-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) +requestQueryID ns targetID = do + cacheSnapshot <- readIORef $ getNodeCacheRef ns + let localResult = queryLocalCache ns cacheSnapshot (fromMaybe 1 $ getLNumBestNodes ns) targetID + -- FOUND can only be returned if targetID is owned by local node + case localResult of + FOUND thisNode -> return thisNode + FORWARD nodeSet -> + sockets <- mapM (\resultNode -> mkSendSocket (domain result) (dhtPort resultNode)) $ Set.toList nodeSet + -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 + responses = mapM + +sendRequestTo :: Int -- ^ timeout in seconds + -> Int -- ^ number of retries + -> FediChordMessage -- ^ the message to be sent + -> Socket -- ^ connected socket to use for sending + -> IO (Set.Set FediChordMessage) -- ^ responses +sendRequestTo timeout attempts msg sock = do + let requests = serialiseMessage 1200 msg + -- ToDo: make attempts and timeout configurable + attempts 3 . timeout 5000 $ do + where + -- state reingeben: state = noch nicht geackte messages, result = responses + sendAndAck :: Socket -> StateT (Map.Map Integer BS.ByteString) IO (Set.Set FediChordMessage) + sendAndAck sock = do + remainingSends <- get + sendMany sock $ Map.elems remainingSends + -- timeout pro receive socket, danach catMaybes + -- wichtig: Pakete können dupliziert werden, dh es können mehr ACKs als gesendete parts ankommen + replicateM + + + + +-- idea: send all parts at once +-- Set/ Map with unacked parts +-- then recv with timeout for |unackedParts| attempts, receive acked parts from set/ map +-- how to manage individual retries? nested "attempts" + -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry