forked from schmittlauch/Hash2Pub
stylish run
This commit is contained in:
parent
ab9d593a1b
commit
1a962f1500
|
@ -1,16 +1,16 @@
|
||||||
{-# LANGUAGE OverloadedStrings #-}
|
{-# LANGUAGE OverloadedStrings #-}
|
||||||
|
|
||||||
module Main where
|
module Main where
|
||||||
|
|
||||||
import System.Random
|
import Control.Concurrent
|
||||||
import Control.Concurrent
|
import Control.Monad (forM_)
|
||||||
import Control.Monad (forM_)
|
import Control.Monad.IO.Class
|
||||||
import Control.Monad.State.Class
|
import Control.Monad.State.Class
|
||||||
import Control.Monad.State.Strict (evalStateT)
|
import Control.Monad.State.Strict (evalStateT)
|
||||||
import Control.Monad.IO.Class
|
import qualified Network.HTTP.Client as HTTP
|
||||||
import qualified Network.HTTP.Client as HTTP
|
import System.Random
|
||||||
|
|
||||||
import Hash2Pub.PostService (clientPublishPost, Hashtag)
|
import Hash2Pub.PostService (Hashtag, clientPublishPost)
|
||||||
|
|
||||||
-- placeholder post data definition
|
-- placeholder post data definition
|
||||||
|
|
||||||
|
|
|
@ -49,8 +49,8 @@ import Control.Concurrent.STM.TQueue
|
||||||
import Control.Concurrent.STM.TVar
|
import Control.Concurrent.STM.TVar
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
import Control.Monad (foldM, forM, forM_, void, when)
|
import Control.Monad (foldM, forM, forM_, void, when)
|
||||||
import Control.Monad.IO.Class (MonadIO(..))
|
import Control.Monad.Except (MonadError (..), runExceptT)
|
||||||
import Control.Monad.Except (MonadError(..), runExceptT)
|
import Control.Monad.IO.Class (MonadIO (..))
|
||||||
import qualified Data.ByteString as BS
|
import qualified Data.ByteString as BS
|
||||||
import Data.Either (rights)
|
import Data.Either (rights)
|
||||||
import Data.Foldable (foldl', foldr', foldrM)
|
import Data.Foldable (foldl', foldr', foldrM)
|
||||||
|
@ -516,7 +516,7 @@ requestJoin toJoinOn ownStateSTM = do
|
||||||
([], Set.empty, Set.empty)
|
([], Set.empty, Set.empty)
|
||||||
responses
|
responses
|
||||||
-- sort, slice and set the accumulated successors and predecessors
|
-- sort, slice and set the accumulated successors and predecessors
|
||||||
-- the contacted node itself is a successor as well and, with few
|
-- the contacted node itself is a successor as well and, with few
|
||||||
-- nodes, can be a predecessor as well
|
-- nodes, can be a predecessor as well
|
||||||
newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap
|
newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap
|
||||||
writeTVar ownStateSTM newState
|
writeTVar ownStateSTM newState
|
||||||
|
@ -596,7 +596,7 @@ sendQueryIdMessages targetID ns lParam targets = do
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
-- collect cache entries from all responses
|
-- collect cache entries from all responses
|
||||||
foldrM (\resp acc -> do
|
foldrM (\resp acc -> do
|
||||||
let
|
let
|
||||||
responseResult = queryResult <$> payload resp
|
responseResult = queryResult <$> payload resp
|
||||||
entrySet = case responseResult of
|
entrySet = case responseResult of
|
||||||
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
|
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
|
||||||
|
@ -609,14 +609,14 @@ sendQueryIdMessages targetID ns lParam targets = do
|
||||||
pure $ case acc of
|
pure $ case acc of
|
||||||
-- once a FOUND as been encountered, return this as a result
|
-- once a FOUND as been encountered, return this as a result
|
||||||
FOUND{} -> acc
|
FOUND{} -> acc
|
||||||
FORWARD accSet
|
FORWARD accSet
|
||||||
| maybe False isFound responseResult -> fromJust responseResult
|
| maybe False isFound responseResult -> fromJust responseResult
|
||||||
| otherwise -> FORWARD $ entrySet `Set.union` accSet
|
| otherwise -> FORWARD $ entrySet `Set.union` accSet
|
||||||
|
|
||||||
) (FORWARD Set.empty) responses
|
) (FORWARD Set.empty) responses
|
||||||
where
|
where
|
||||||
isFound FOUND{} = True
|
isFound FOUND{} = True
|
||||||
isFound _ = False
|
isFound _ = False
|
||||||
|
|
||||||
-- | Create a QueryID message to be supplied to 'sendRequestTo'
|
-- | Create a QueryID message to be supplied to 'sendRequestTo'
|
||||||
lookupMessage :: Integral i
|
lookupMessage :: Integral i
|
||||||
|
|
|
@ -504,7 +504,7 @@ stabiliseThread nsSTM = forever $ do
|
||||||
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
|
||||||
ns' <- readTVarIO nsSTM
|
ns' <- readTVarIO nsSTM
|
||||||
nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
|
nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
|
||||||
either
|
either
|
||||||
(const $ pure ())
|
(const $ pure ())
|
||||||
(\entry -> atomically $ do
|
(\entry -> atomically $ do
|
||||||
latestNs <- readTVar nsSTM
|
latestNs <- readTVar nsSTM
|
||||||
|
@ -782,7 +782,7 @@ updateLookupCache nodeSTM keyToLookup = do
|
||||||
-- TODO: better retry management, because having no vserver joined yet should
|
-- TODO: better retry management, because having no vserver joined yet should
|
||||||
-- be treated differently than other reasons for not getting a result.
|
-- be treated differently than other reasons for not getting a result.
|
||||||
newResponsible <- runExceptT $ requestQueryID n keyToLookup
|
newResponsible <- runExceptT $ requestQueryID n keyToLookup
|
||||||
either
|
either
|
||||||
(const $ pure Nothing)
|
(const $ pure Nothing)
|
||||||
(\result -> do
|
(\result -> do
|
||||||
let newEntry = (getDomain result, getServicePort result)
|
let newEntry = (getDomain result, getServicePort result)
|
||||||
|
|
|
@ -11,27 +11,27 @@ module Hash2Pub.PostService where
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import Control.Exception (Exception (..), try)
|
import Control.Exception (Exception (..), try)
|
||||||
import Control.Monad (foldM, forM, forM_, forever, when, void)
|
import Control.Monad (foldM, forM, forM_, forever, void,
|
||||||
import Control.Monad.IO.Class (liftIO)
|
when)
|
||||||
|
import Control.Monad.IO.Class (liftIO)
|
||||||
import Data.Bifunctor
|
import Data.Bifunctor
|
||||||
import qualified Data.ByteString.Lazy.UTF8 as BSUL
|
import qualified Data.ByteString.Lazy.UTF8 as BSUL
|
||||||
import qualified Data.ByteString.UTF8 as BSU
|
import qualified Data.ByteString.UTF8 as BSU
|
||||||
import qualified Data.HashMap.Strict as HMap
|
import qualified Data.HashMap.Strict as HMap
|
||||||
import qualified Data.HashSet as HSet
|
import qualified Data.HashSet as HSet
|
||||||
import Data.Maybe (fromMaybe, isJust)
|
import Data.Maybe (fromMaybe, isJust)
|
||||||
import Data.String (fromString)
|
import Data.String (fromString)
|
||||||
import qualified Data.Text.Lazy as Txt
|
import qualified Data.Text.Lazy as Txt
|
||||||
import Data.Text.Normalize (NormalizationMode (NFC),
|
import Data.Text.Normalize (NormalizationMode (NFC), normalize)
|
||||||
normalize)
|
|
||||||
import Data.Time.Clock.POSIX
|
import Data.Time.Clock.POSIX
|
||||||
import Data.Typeable (Typeable)
|
import Data.Typeable (Typeable)
|
||||||
import qualified Network.HTTP.Client as HTTP
|
import qualified Network.HTTP.Client as HTTP
|
||||||
import qualified Network.HTTP.Types as HTTPT
|
import qualified Network.HTTP.Types as HTTPT
|
||||||
import System.Random
|
import System.Random
|
||||||
import Text.Read (readEither)
|
import Text.Read (readEither)
|
||||||
|
|
||||||
import qualified Network.Wai.Handler.Warp as Warp
|
import qualified Network.Wai.Handler.Warp as Warp
|
||||||
import Servant
|
import Servant
|
||||||
import Servant.Client
|
import Servant.Client
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue