concurrency and parallelism
TLDR
- see runtimeMemory.nim for more on threads, thread synchronization, memory and GC
- see servers.nim for async server stuff
- you need the following for any thread related logic
- required: --threads:on switch
- should use: std/locks
- you need the following for any async stuff
- getFuturesInProgress requires --define:futureLogging
- its useful to think about threads and channels using the actor model
- actor: a procedure recreated on a thread to execute some logic
- its simpler for actors to pull/push data via a channel to/from other actors
- else you can pass data between actors through a thread when its created
- an actor can create additional actors/threads/channels
- channel: the bus in which data is sent between actors
- channels defined on the main/current thread are available to all sibling actors
- channels not defined on the main thread must be passed to other threads by ptr via an actor
- thread: where execution occurs on a CPU, 12-core machine has 12 concurrent execution contexts
- only a single thread can execute at any given time per cpu, timesharing occurs otherwise
- Threadvoid: no data is passed via thread to its actor; the actor uses a channel only
- ThreadNotVoid: on thread creation, instance of NotVoid is expected and passed to its actor
- in order to pass multiple params, use something like a tuple/array/etc
- actor: a procedure recreated on a thread to execute some logic
links
- other
- system
- pkgs
- niche
TODOs
- passing channels safely
- multiple async backend support
- add more sophisticated asyncdispatch examples
- acquiring a lock for a channel is useless, locks only work with guarded vars
- ^ update examples
concurrency and parallelism in nim
- task: generally a process, e.g. an instance of a program
- thread: child of a parent process, that can execute in parallel to other threads
- threads will spawn child processes to execute their tasks
- main process -> child thread -> child threads process -> execute this task
- concurrency: performing tasks without waiting for other tasks is highly evolved
- are CPU bound, i.e. execute on the same thread with timesharing to simulate multitasking
- parallelism: performing tasks at the same time is still evolving
- the API is mature and stable, however, the dev teams goals have yet to be fully realized
- e.g. parallel async await might not be available yet
- parallel tasks are distributed across physical CPUs for true multitasking
- or via simultaneous multithreading (SMT) like intels Hyper-Threading
- if all CPUs are taken, timesharing occurs (concurrency semantics)
- the API is mature and stable, however, the dev teams goals have yet to be fully realized
threads
- each thread has its own GC heap and mem sharing is restricted
- improves efficiency and prevents race conditions
- procs used with threads require {.thread.} pragma
- to create a thread from the proc you must use spawn/createThread
- proc signature cant have var/ref/closure types (enforces no heap sharing restriction)
- implies procvar
- vars local to threads must use {.threadvar.}
- implies all the effects of {.global.}
- can be defined but not initialized: it will be replicated at thread creation
- var x* {.threadvar.}: string is okay, but not .... = "abc"
- exceptions
- handled exceptions dont propagate across threads
- unhandled exceptions terminates the entire process
thread vs threadpool
- thread (system) create and save a thread to a variable
- requires manually managing the thread, its tasks, and execution
- are resource intensive: only when full control is required on a limited number of threads
- executes procedures but doesnt return their results
- spawn (threadpool): create a task and save its future result to a variable
- you spawn a procedure thats added to a pool (queue) of tasks
- threadpool manages creation of threads, distribution and execution of tasks
- you dont have to worry about the number of threads or underutilizing created threads
- are optimized and efficient: can be used for creating ALOT of threads with intensive tasks
- execute any invocable expression and returns a FlowVarT with the future result
- spawn and FlowVar
- check flowVar.isReady instead of awaiting it directly to not block the current thread
- e.g in a loop with sleep to pause between iterations
- when the flowVar is fullfilled retrieve the value with ^flowVar
- procedures that return a non-ref type cant be spawned
thread pragmas
- thread: this proc is intended for multitasking
- threadvar: declares this var as a threads' var
- raises: should always be used to ensure a thread proc handles all its exceptions
system thread types
- ThreadT object
system thread procs
- createThread and execute a proc on it
- getThreadId of some thread
- handle of ThreadT
- joinThread back to main process when finished
- joinThreads back to main process when finished
- onThreadDestruction called upon threads destruction (returns/throws)
- pinToCpu sets the affinity for a thread
threadpool
- implements parallel & spawn
- abstraction over lower level system threads
threadpool types
- FlowVarT future returned from a spawned proc containing a value T
- FlowVarBase untyped base class for FlowVar
- ThreadId
threadpool consts
- MaxDistinguishedThread == 32
- MaxThreadPoolSize == 256
threadpool operators
- ^ blocks if the value isnt ready, then always returns its value, check blah.isReady as workaround
threadpool procs
- awaitAndThen blocks until flowvar is available, then executes action(flowVar)
- blockUntil flowvar is available
- blockUntilAny flowvars are available; if all flowvars are already awaited returns -1
- isReady true if flowvarBase value is ready; awaiting ready flowvars dont block
- parallel block to run in parallel
- pinnedSpawn always call action Y on actor X
- preferSpawn to determine if spawn/direct call is preferred; micro optimization
- setMaxPoolSize changes MaxThreadPoolSize
- setMinPoolSize from the default 4
- spawn action on a new actor; action is never invoked on the calling thread
- sync spanwed actors; i.e. joinThreads
- unsafeRead a flowvar; blocks until flowvar value is available
- spawnX action on new thread if CPU core ready; else on this thread; blocks produce; prefer spawn
channels
- designed for system.threads, unstable when used with spawn
- deeply copies non cyclic data from thread X to thread Y
- channels declared in the main thread (module scope) is simpler and shared across all threads
- else you can declare within the body of proc thread and send the ptr to another
system channel types
- ChannelT for relaying messages of type T
system channel procs
- close permenantly a channel and frees its resources
- open or update a channel with size int (0 == unlimited)
- peek at total messages in channel, -1 if channel closed, use tryRecv instead to avoid race conds
- ready true if some thread is waiting for new messages
- recv data; blocks its channel scope until delivered
- send deeply copied data; blocks its channel scope until sent
- tryRecv (bool, msg)
- trySend deeply copied data without blocking
locks
- locks and conition vars
lock types
- Cond SysCond condition variable
- Lock SysLock whether its re-entrant/not is unspecified
lock procs
- acquire the given lock
- broadcast unblocks threads blocked on the specified condition variable
- deinitCond frees resources associated with condition var
- deinitLock frees resources associated with lock
- initCond initializes a condition var
- initLock intiializes a lock
- release a lock
- signal to a condition var
- tryAcquire a given lock
- wait on the condition var
lock pragmas
- guard assigns a lock to a variable, compiler throws if r/w attempts without requireing lock
lock templates
- withLock: acquires > executes body > releases, useful with guarded variables
asyncdispatch
- asynchronous IO: dispatcher (event loop), future and reactor (sync-style) await
- the primary way to create and consume async programs
- dispatcher: simple event loop that buffers events to be polled (pulled) from the stack
- linux: uses epoll
- windows: IO Completion Ports
- other: select
- poll: doesnt return events, but Futureevents when they're completed with a value/error
- always use a reactor pattern (IMO) e.g. waitFor/runForever
- procs of type FutureT | void require {.async.} pragma for enabling await in the body
- awaited procs are suspended until and resumed once their Future arg is completed
- the dispatcher invokes the next async proc while the current is suspended
- vars, objects and other procs can be awaited
- awaited Futures with unhandled exceptions are rethrown
- yield Future; f.failed instead of try: await Future except: for increased reliability
- procs of type FutureT | void require {.async.} pragma for enabling await in the body
- alternatively (IMO not preferred) use the proactor pattern
- you can check Future.finished for success/failure and .failed specifically
- or pass a callback
- always use a reactor pattern (IMO) e.g. waitFor/runForever
- Futures ignore {raises: } effects
- addWrite/Read exist for adapting unix-like libraries to be async on windows; avoid if possible
asyncdispatch types
- AsyncEvent ptr
- AsyncFD file descriptor
- Callback proc(AsyncFD)
- CompletionData object
- fd: AsyncFD
- cb: Callback
- cell: ForeignCell (system)
- CustomRef
- PDispatcher ref of PDispatcherBase
- ioPort: Handle (winlean)
- handles: HashSetAsyncFD
asyncdispatch procs
- accept new socket connection returning future client socket
- acceptAddr new socket connecting returning future (client , address)
- activeDescriptors for the current event loop (doesnt require syscall)
- addEvent registers cb to invoke upon some AsyncEvent
- addProcess registeres cb to invoke when some PID exits
- addRead starts watching AsyncFD and invokes cb when its read-ready; only useful for windows
- addTimer invokes cb after/every int milliseconds
- addWrite starts watching AsyncFD and invokes cb when its write-ready; only useful for windows
- callSoon invoke cb when control returns to the event loop
- close an AsyncEvent
- closeSocket and unregister it
- connect to socket FD at some remote addr, port and domain
- contains true if AsyncFD is registered on the current threads event loop
- createAsyncNativeSocket
- dial and connect to addr:port via some protocol (e.g. TCP for IPv4/6); tries until successful
- drain and process as many events until timeout X; errors if no events are pending
- getGlobalDispatcher
- getIoHandler for some Dispatcher; supports both win & linux
- hasPendingOperations only checks global dispatcher
- maxDescriptors of the current process (requires syscall); only for Windows, Linux, OSX, BSD
- newAsyncEvent threadsafe; not auto registered with a dispatcher
- newCustom CustomRef
- newDispatcher for this thread
- poll for X then wait to process pending events as they complete; throws ValueError if none exist
- readAll FutureStreamstring that completes when all data is consumed
- recv from socket and complete once up to/before X bytes read/socket disconnects
- recvFromInto buf of size X, datagram from socket; senders addr saved in saddr and saddrlen
- recvInto buf of size X, data from socket; completes once up to/before X bytes read/socket disconnects
- register AsyncFD with some dispatcher
- runForever the global dispatcher poll event loop
- send X bytes from buf to socket; complete once all data sent
- sendTo socket some data
- setGlobalDispatcher
- setInheritable this file descriptor by child processes; not guaranteed check with declared()
- sleepAsync for X milliseconds
- trigger AsyncEvent
- unregister AsyncEvent
- waitFor and block the current thread until Future completes
- withTimeout wait for this Future or return false if timeout expires
asyncdispatch macros
- async converts async procedures into iterators and yield statements
- multisync converts async procs into both async & sync procs (removes await calls)
asyncfutures
- primitives for creating and consuming futures
- all other modules build on asyncfutures and generally isnt imported directly
asyncfutures types
- FutureT ref of FutureBase
- value
- FutureBase ref of RootObject
- callbacks: CallbackList
- finished: bool
- error: Exception
- errorStackTrace: string
- FutureError object of Defect
- cause: FutureBase
- FutureVarT distinct FutureT
asyncfutures consts
- isFutureLoggingEnabled
asyncFutures procs
- and returns future X when future Y and Z complete
- or returns future X when future Y or Z complete
- addCallback to execute when future X completes; accepts FutureBaseT/FutureT
- all returns when futures 0..X complete
- asyncCheck discards futures
- callsoon somecallback on next tick of asyncdispatcher if running, else immediately
- clean resets finished status of some future
- clearCallbacks
- complete future X with value Y
- fail future X with exception Y
- failed bool
- finished bool
- getCallSoonProc
- mget a mutable value stored in future
- newFuture of type T owned by proc X
- read the value of a finished future
- readError of a failed future
- setCallSoonproc change implementation of callsoon
asyncfile
- asynchronous reads & writes
- unlike std/os you need to get an FD on a file first via openAsync
- most procs require an AsyncFD and not a filenamestring
asyncfile types
- AsyncFile = ref object
- fd: AsyncFD
- offset: int64
asyncfile procs
- close a file
- get | setFilePos | Size
- newAsyncFile from an AsyncFD
- openAsync file X in mode Y returning AsyncFile; all other procs require an AsyncFile
- readAll | Buffer | Line | ToStream
- writeBuffer | FromStream
- writeFromStream: perfect for saving streamed data to af ile without wasting memory
Procs
proc echoAction[T](x: T): void {.thread.}
- withLock to acquire, execute & release automatically Source Edit
proc laterGater(s: string): Future[void] {....stackTrace: false, raises: [KeyError, Exception], tags: [TimeEffect, RootEffect], forbids: [].}
- Source Edit
proc receiveAction(): void {.thread, ...raises: [ValueError], tags: [], forbids: [].}
- action for consuming data recv blocks its channel's scope until msg received Source Edit
proc receiveActionA(): void {.thread, ...raises: [ValueError], tags: [TimeEffect], forbids: [].}
- action for consuming data without blocking Source Edit
proc sendAction(): void {.thread, ...raises: [], tags: [TimeEffect], forbids: [].}
- Source Edit
proc sendActionA(): void {.thread, ...raises: [], tags: [TimeEffect], forbids: [].}
- action for sending data without blocking Source Edit