src/bookofnim/deepdives/asyncPar

Source   Edit  

concurrency and parallelism

bookmark

TLDR

  • see runtimeMemory.nim for more on threads, thread synchronization, memory and GC
  • see servers.nim for async server stuff
  • you need the following for any thread related logic
    • required: --threads:on switch
    • should use: std/locks
  • you need the following for any async stuff
    • getFuturesInProgress requires --define:futureLogging
  • its useful to think about threads and channels using the actor model
    • actor: a procedure recreated on a thread to execute some logic
      • its simpler for actors to pull/push data via a channel to/from other actors
      • else you can pass data between actors through a thread when its created
      • an actor can create additional actors/threads/channels
    • channel: the bus in which data is sent between actors
      • channels defined on the main/current thread are available to all sibling actors
      • channels not defined on the main thread must be passed to other threads by ptr via an actor
    • thread: where execution occurs on a CPU, 12-core machine has 12 concurrent execution contexts
      • only a single thread can execute at any given time per cpu, timesharing occurs otherwise
      • Threadvoid: no data is passed via thread to its actor; the actor uses a channel only
      • ThreadNotVoid: on thread creation, instance of NotVoid is expected and passed to its actor
        • in order to pass multiple params, use something like a tuple/array/etc

links

TODOs

concurrency and parallelism in nim

  • task: generally a process, e.g. an instance of a program
  • thread: child of a parent process, that can execute in parallel to other threads
    • threads will spawn child processes to execute their tasks
    • main process -> child thread -> child threads process -> execute this task
  • concurrency: performing tasks without waiting for other tasks is highly evolved
    • are CPU bound, i.e. execute on the same thread with timesharing to simulate multitasking
  • parallelism: performing tasks at the same time is still evolving
    • the API is mature and stable, however, the dev teams goals have yet to be fully realized
      • e.g. parallel async await might not be available yet
    • parallel tasks are distributed across physical CPUs for true multitasking
      • or via simultaneous multithreading (SMT) like intels Hyper-Threading
    • if all CPUs are taken, timesharing occurs (concurrency semantics)

threads

  • each thread has its own GC heap and mem sharing is restricted
    • improves efficiency and prevents race conditions
  • procs used with threads require {.thread.} pragma
    • to create a thread from the proc you must use spawn/createThread
    • proc signature cant have var/ref/closure types (enforces no heap sharing restriction)
    • implies procvar
    • vars local to threads must use {.threadvar.}
      • implies all the effects of {.global.}
      • can be defined but not initialized: it will be replicated at thread creation
        • var x* {.threadvar.}: string is okay, but not .... = "abc"
    • exceptions
      • handled exceptions dont propagate across threads
      • unhandled exceptions terminates the entire process

thread vs threadpool

  • thread (system) create and save a thread to a variable
    • requires manually managing the thread, its tasks, and execution
    • are resource intensive: only when full control is required on a limited number of threads
    • executes procedures but doesnt return their results
  • spawn (threadpool): create a task and save its future result to a variable
    • you spawn a procedure thats added to a pool (queue) of tasks
    • threadpool manages creation of threads, distribution and execution of tasks
      • you dont have to worry about the number of threads or underutilizing created threads
    • are optimized and efficient: can be used for creating ALOT of threads with intensive tasks
    • execute any invocable expression and returns a FlowVarT with the future result
  • spawn and FlowVar
    • check flowVar.isReady instead of awaiting it directly to not block the current thread
    • e.g in a loop with sleep to pause between iterations
    • when the flowVar is fullfilled retrieve the value with ^flowVar
    • procedures that return a non-ref type cant be spawned

thread pragmas

  • thread: this proc is intended for multitasking
  • threadvar: declares this var as a threads' var
  • raises: should always be used to ensure a thread proc handles all its exceptions

system thread types

  • ThreadT object

system thread procs

  • createThread and execute a proc on it
  • getThreadId of some thread
  • handle of ThreadT
  • joinThread back to main process when finished
  • joinThreads back to main process when finished
  • onThreadDestruction called upon threads destruction (returns/throws)
  • pinToCpu sets the affinity for a thread

threadpool

  • implements parallel & spawn
  • abstraction over lower level system threads

threadpool types

  • FlowVarT future returned from a spawned proc containing a value T
  • FlowVarBase untyped base class for FlowVar
  • ThreadId

threadpool consts

  • MaxDistinguishedThread == 32
  • MaxThreadPoolSize == 256

threadpool operators

  • ^ blocks if the value isnt ready, then always returns its value, check blah.isReady as workaround

threadpool procs

  • awaitAndThen blocks until flowvar is available, then executes action(flowVar)
  • blockUntil flowvar is available
  • blockUntilAny flowvars are available; if all flowvars are already awaited returns -1
  • isReady true if flowvarBase value is ready; awaiting ready flowvars dont block
  • parallel block to run in parallel
  • pinnedSpawn always call action Y on actor X
  • preferSpawn to determine if spawn/direct call is preferred; micro optimization
  • setMaxPoolSize changes MaxThreadPoolSize
  • setMinPoolSize from the default 4
  • spawn action on a new actor; action is never invoked on the calling thread
  • sync spanwed actors; i.e. joinThreads
  • unsafeRead a flowvar; blocks until flowvar value is available
  • spawnX action on new thread if CPU core ready; else on this thread; blocks produce; prefer spawn

channels

  • designed for system.threads, unstable when used with spawn
  • deeply copies non cyclic data from thread X to thread Y
  • channels declared in the main thread (module scope) is simpler and shared across all threads
    • else you can declare within the body of proc thread and send the ptr to another

system channel types

  • ChannelT for relaying messages of type T

system channel procs

  • close permenantly a channel and frees its resources
  • open or update a channel with size int (0 == unlimited)
  • peek at total messages in channel, -1 if channel closed, use tryRecv instead to avoid race conds
  • ready true if some thread is waiting for new messages
  • recv data; blocks its channel scope until delivered
  • send deeply copied data; blocks its channel scope until sent
  • tryRecv (bool, msg)
  • trySend deeply copied data without blocking

locks

  • locks and conition vars

lock types

  • Cond SysCond condition variable
  • Lock SysLock whether its re-entrant/not is unspecified

lock procs

  • acquire the given lock
  • broadcast unblocks threads blocked on the specified condition variable
  • deinitCond frees resources associated with condition var
  • deinitLock frees resources associated with lock
  • initCond initializes a condition var
  • initLock intiializes a lock
  • release a lock
  • signal to a condition var
  • tryAcquire a given lock
  • wait on the condition var

lock pragmas

  • guard assigns a lock to a variable, compiler throws if r/w attempts without requireing lock

lock templates

  • withLock: acquires > executes body > releases, useful with guarded variables

asyncdispatch

  • asynchronous IO: dispatcher (event loop), future and reactor (sync-style) await
  • the primary way to create and consume async programs
  • dispatcher: simple event loop that buffers events to be polled (pulled) from the stack
    • linux: uses epoll
    • windows: IO Completion Ports
    • other: select
  • poll: doesnt return events, but Futureevents when they're completed with a value/error
    • always use a reactor pattern (IMO) e.g. waitFor/runForever
      • procs of type FutureT | void require {.async.} pragma for enabling await in the body
        • awaited procs are suspended until and resumed once their Future arg is completed
        • the dispatcher invokes the next async proc while the current is suspended
        • vars, objects and other procs can be awaited
        • awaited Futures with unhandled exceptions are rethrown
          • yield Future; f.failed instead of try: await Future except: for increased reliability
    • alternatively (IMO not preferred) use the proactor pattern
      • you can check Future.finished for success/failure and .failed specifically
      • or pass a callback
  • Futures ignore {raises: } effects
  • addWrite/Read exist for adapting unix-like libraries to be async on windows; avoid if possible

asyncdispatch types

  • AsyncEvent ptr
  • AsyncFD file descriptor
  • Callback proc(AsyncFD)
  • CompletionData object
    • fd: AsyncFD
    • cb: Callback
    • cell: ForeignCell (system)
  • CustomRef
  • PDispatcher ref of PDispatcherBase
    • ioPort: Handle (winlean)
    • handles: HashSetAsyncFD

asyncdispatch procs

  • accept new socket connection returning future client socket
  • acceptAddr new socket connecting returning future (client , address)
  • activeDescriptors for the current event loop (doesnt require syscall)
  • addEvent registers cb to invoke upon some AsyncEvent
  • addProcess registeres cb to invoke when some PID exits
  • addRead starts watching AsyncFD and invokes cb when its read-ready; only useful for windows
  • addTimer invokes cb after/every int milliseconds
  • addWrite starts watching AsyncFD and invokes cb when its write-ready; only useful for windows
  • callSoon invoke cb when control returns to the event loop
  • close an AsyncEvent
  • closeSocket and unregister it
  • connect to socket FD at some remote addr, port and domain
  • contains true if AsyncFD is registered on the current threads event loop
  • createAsyncNativeSocket
  • dial and connect to addr:port via some protocol (e.g. TCP for IPv4/6); tries until successful
  • drain and process as many events until timeout X; errors if no events are pending
  • getGlobalDispatcher
  • getIoHandler for some Dispatcher; supports both win & linux
  • hasPendingOperations only checks global dispatcher
  • maxDescriptors of the current process (requires syscall); only for Windows, Linux, OSX, BSD
  • newAsyncEvent threadsafe; not auto registered with a dispatcher
  • newCustom CustomRef
  • newDispatcher for this thread
  • poll for X then wait to process pending events as they complete; throws ValueError if none exist
  • readAll FutureStreamstring that completes when all data is consumed
  • recv from socket and complete once up to/before X bytes read/socket disconnects
  • recvFromInto buf of size X, datagram from socket; senders addr saved in saddr and saddrlen
  • recvInto buf of size X, data from socket; completes once up to/before X bytes read/socket disconnects
  • register AsyncFD with some dispatcher
  • runForever the global dispatcher poll event loop
  • send X bytes from buf to socket; complete once all data sent
  • sendTo socket some data
  • setGlobalDispatcher
  • setInheritable this file descriptor by child processes; not guaranteed check with declared()
  • sleepAsync for X milliseconds
  • trigger AsyncEvent
  • unregister AsyncEvent
  • waitFor and block the current thread until Future completes
  • withTimeout wait for this Future or return false if timeout expires

asyncdispatch macros

  • async converts async procedures into iterators and yield statements
  • multisync converts async procs into both async & sync procs (removes await calls)

asyncfutures

  • primitives for creating and consuming futures
  • all other modules build on asyncfutures and generally isnt imported directly

asyncfutures types

  • FutureT ref of FutureBase
    • value
  • FutureBase ref of RootObject
    • callbacks: CallbackList
    • finished: bool
    • error: Exception
    • errorStackTrace: string
  • FutureError object of Defect
    • cause: FutureBase
  • FutureVarT distinct FutureT

asyncfutures consts

  • isFutureLoggingEnabled

asyncFutures procs

  • and returns future X when future Y and Z complete
  • or returns future X when future Y or Z complete
  • addCallback to execute when future X completes; accepts FutureBaseT/FutureT
  • all returns when futures 0..X complete
  • asyncCheck discards futures
  • callsoon somecallback on next tick of asyncdispatcher if running, else immediately
  • clean resets finished status of some future
  • clearCallbacks
  • complete future X with value Y
  • fail future X with exception Y
  • failed bool
  • finished bool
  • getCallSoonProc
  • mget a mutable value stored in future
  • newFuture of type T owned by proc X
  • read the value of a finished future
  • readError of a failed future
  • setCallSoonproc change implementation of callsoon

asyncfile

  • asynchronous reads & writes
  • unlike std/os you need to get an FD on a file first via openAsync
    • most procs require an AsyncFD and not a filenamestring

asyncfile types

  • AsyncFile = ref object
    • fd: AsyncFD
    • offset: int64

asyncfile procs

adjust channel size capping at 1 messageunsuccessful because total msg > channel size 1TODO: this doesnt echo @see https://forum.nim-lang.org/t/9946#65549

Vars

bf: Thread[void]
Source   Edit  
fake2 = newFutureVar("FutureVar example")
FutureVarT Source   Edit  
gf: Thread[void]
Source   Edit  
iAmGuarded {.guard: L.}: string = "require r/w to occur through my lock"
Source   Edit  
L: Lock
Source   Edit  
numThreads: array[4, Thread[int]]
Source   Edit  
reader = openAsync(afilepath, fmRead)
Source   Edit  
relay: Channel[string]
a queue for string data Source   Edit  
writer = openAsync(afilepath, fmWrite)
Source   Edit  

Lets

afterwhile = laterGater("after while crocodile")
Source   Edit  
cursize = getFileSize(writer)
Source   Edit  
fake1 = newFuture("success example")
FutureT Source   Edit  
fakeFailed = newFuture("failed example")
provide a name for debugging Source   Edit  
fv1 = f1()
Source   Edit  
seeya = laterGater("see ya later aligator")
Source   Edit  
someErr =
  (ref ValueError)(msg: "oops", parent: nil)
Source   Edit  

Consts

afilepath = "/tmp/or/rary.txt"
Source   Edit  

Procs

proc echoAction[T](x: T): void {.thread.}
withLock to acquire, execute & release automatically Source   Edit  
proc f1(): Future[string] {....stackTrace: false, raises: [KeyError, Exception],
                            tags: [RootEffect, TimeEffect], forbids: [].}
handling exeptions the correct way Source   Edit  
proc f2(): Future[string] {....stackTrace: false, raises: [KeyError, Exception],
                            tags: [TimeEffect, RootEffect], forbids: [].}
handling exceptions the wrong way Source   Edit  
proc laterGater(s: string): Future[void] {....stackTrace: false,
    raises: [KeyError, Exception], tags: [TimeEffect, RootEffect], forbids: [].}
Source   Edit  
proc receiveAction(): void {.thread, ...raises: [ValueError], tags: [], forbids: [].}
action for consuming data recv blocks its channel's scope until msg received Source   Edit  
proc receiveActionA(): void {.thread, ...raises: [ValueError], tags: [TimeEffect],
                              forbids: [].}
action for consuming data without blocking Source   Edit  
proc sendAction(): void {.thread, ...raises: [], tags: [TimeEffect], forbids: [].}
Source   Edit  
proc sendActionA(): void {.thread, ...raises: [], tags: [TimeEffect], forbids: [].}
action for sending data without blocking Source   Edit