workerpool.jl

oUqE2/src/ is a source file in module oUqE2
    
1
    

			
			
			
			
			@enum
			
			 
			
			
			
			



			PoolState
	
			
			 
			
			
			
			begin
			
			
    
			
			



			Done
	
			
			
    
			
			



			Running
	
			
			
    
			
			



			Failed
	
			
			

			
			
			end
			
			


			
			
			
			
			struct
			
			 
			
			
			
			
			



			PoolFailedException
	
			
			 
			
			<: 
			
			
			Exception
			
			
    
			
			
			
			s
			
			::
			
			
			Any
			
			

			
			
			end
			
			


			
			
			
			
			mutable
			
			 
			
			
			struct
			
			 
			
			
			
			



			WorkerPool
	
			
			{
			
			TArgs
			
			}
    
			
			
			
			workerfn
			
			::
			
			
			Any
			
			
    
			
			
			args
			
			::
			
			
			Vector
			
			{
			
			TArgs
			
			}
    
			
			
			useprimary
			
			::
			
			
			Bool
			
			
    
			
			
			state
			
			::
			
			



			PoolState
	
			
			
    
			
			
			ntasks
			
			::
			
			
			
			Threads
			
			.
			
			
			Atomic
			
			{
			
			Int
			
			}

			
			
			end
			
			



			
			
			
			
			function
			
			 
			
			
			



			WorkerPool
	
			
			(
			
			workerfn
			
			, 
			
			
			args
			
			::
			
			
			AbstractVector
			
			{
			
			TArgs
			
			}; 
			
			
			
			
			useprimary
			
			 
			
			= 
			
			false
			
			) 
			
			where 
			
			{
			
			TArgs
			
			}
    
			
			
			
			
			(
			
			
			
			useprimary
			
			 
			
			|| 
			
			
			
			
			Threads
			
			.
			
			
			nthreads
			
			(
			
			) 
			
			> 
			
			1
			
			) 
			
			||
        
			
			
			error
			
			(
			
			"Cannot load data off main thread with only one thread available. Pass `useprimary = true` or start Julia with >1 threads."
			
			)
    
			
			
			
			return
			
			 
			
			
			



			WorkerPool
	
			
			{
			
			TArgs
			
			}
			
			(
			
			workerfn
			
			, 
			
			
			collect
			
			(
			
			args
			
			)
			
			, 
			
			useprimary
			
			, 
			



			Done
	
			
			, 
			
			
			
			
			Threads
			
			.
			
			
			Atomic
			
			{
			
			Int
			
			}
			
			(
			
			0
			
			)
			
			)

			
			
			end
			
			



			
			
			
			
			function
			
			 
			
			



			run
	
			
			(
			
			
			pool
			
			::
			



			WorkerPool
	
			
			)
    
			
			
			
			
			pool
			
			.
			
			
			
			state
			
			 
			
			= 
			
			



			Running
	
			
			
    # set remaining tasks counter.
    
			
			
			
			
			pool
			
			.
			
			
			ntasks
			
			[
			
			] 
			
			= 
			
			
			length
			
			(
			
			
			pool
			
			.
			
			
			args
			
			)

    # watchdog that sends exception to main thread if a worker fails
    
			
			
			
			maintask
			
			 
			
			= 
			
			
			current_task
			
			(
			
			)
    
			
			
			
			@async
			
			 
			
			
			
			
			
			begin
			
			
        
			
			
			
			while
			
			 
			
			
			
			pool
			
			.
			
			
			
			state
			
			 
			
			!== 
			
			



			Done
	
			
			
            
			
			
			
			
			if
			
			 
			
			
			
			pool
			
			.
			
			
			
			state
			
			 
			
			=== 
			
			



			Failed
	
			
			
                
			
			
			
			
			Base
			
			.
			
			
			throwto
			
			(
                    
			
			maintask
			
			,
                    
			
			



			PoolFailedException
	
			
			(
			
			
			"Failed to process all tasks. 
			
			$
			
			(
			
			
			
			pool
			
			.
			
			
			ntasks
			
			[
			
			]
			
			)
			
			 unfinished tasks remaining"
			
			)
			
			,
                
			
			)
            
			
			
			end
			
			
            
			
			
			sleep
			
			(
			
			0.1
			
			)
        
			
			
			end
			
			
    
			
			
			end
			
			

    
			
			
			
			function
			
			 
			
			
			inloop
			
			(
			
			args
			
			)
        #for args in pool.args  # uncomment for debugging
        # task error handling
        
			
			
			
			
			
			pool
			
			.
			
			
			
			state
			
			 
			
			!== 
			
			



			Failed
	
			
			 
			
			|| 
			
			
			error
			
			(
			
			
			"Shutting down worker 
			
			$
			
			(
			
			
			
			Threads
			
			.
			
			
			threadid
			
			(
			
			)
			
			)
			
			"
			
			)
        
			
			
			
			try
			
			
            # execute task
            
			
			
			
			
			pool
			
			.
			
			
			workerfn
			
			(
			
			
			args
			
			...
			
			)
            
			
			
			
			Threads
			
			.
			
			
			atomic_add!
			
			(
			
			
			pool
			
			.
			
			
			ntasks
			
			, 
			
			-1
			
			)
        
			
			
			catch
			
			 
			
			
			e
			
			
            
			
			
			
			println
			
			(
			
			
			stacktrace
			
			(
			
			)
			
			)
            
			
			
			
			@error
			
			 
			
			
			
			
			"Exception while executing task on worker 
			
			$
			
			(
			
			
			
			Threads
			
			.
			
			
			threadid
			
			(
			
			)
			
			)
			
			. Shutting down WorkerPool." 
			
			
			
			e
			
			 
			
			=
                
			
			
			e
			
			 
			
			
			
			stacktrace
			
			 
			
			= 
			
			
			stacktrace
			
			(
			
			) 
			
			
			
			args
			
			 
			
			= 
			
			
			args
			
			
            
			
			
			
			pool
			
			.
			
			
			
			state
			
			 
			
			= 
			
			



			Failed
	
			
			
            
			
			
			rethrow
			
			(
			
			)
        
			
			
			end
			
			
    
			
			
			end
			
			

    
			
			
			
			if
			
			 
			
			
			pool
			
			.
			
			
			
			useprimary
			
			
        
			
			
			
			
			@qthreads
			
			 
			
			
			
			
			
			for
			
			 
			
			
			
			args
			
			 
			
			in 
			
			
			pool
			
			.
			
			
			
			args
			
			
            
			
			
			
			inloop
			
			(
			
			args
			
			)
        
			
			
			end
			
			
    
			
			
			else
			
			
        
			
			
			
			
			@qbthreads
			
			 
			
			
			
			
			
			for
			
			 
			
			
			
			args
			
			 
			
			in 
			
			
			pool
			
			.
			
			
			
			args
			
			
            
			
			
			
			inloop
			
			(
			
			args
			
			)
        
			
			
			end
			
			
    
			
			
			end
			
			

    # Tasks completed successfully
    
			
			
			
			pool
			
			.
			
			
			
			state
			
			 
			
			= 
			
			



			Done
	
			
			

			
			end