using CommonLang.Log; using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; namespace CommonLang.Concurrent { public class ThreadExecutor : Executor { private AtomicLong idgen = new AtomicLong(0); private Logger log; private Thread[] processors; private Queue workQueue = new Queue(); private bool shutdown = false; public ThreadExecutor(int processorCount) { this.log = LoggerFactory.GetLogger(string.Format("ThreadExecutor")); this.processors = new Thread[processorCount]; for (int i = 0; i < processorCount; i++) { this.processors[i] = new Thread(internal_run); this.processors[i].IsBackground = true; this.processors[i].Start(); } } public void Shutdown() { log.Debug("processors shutting down ..."); shutdown = true; lock (workQueue) { foreach (WorkTask task in workQueue) { task.Cancel(); } } for (int i = 0; i < processors.Length; i++) { this.processors[i].Join(); } log.Debug("processors shutdown !"); } private void internal_run() { try { while (!shutdown) { WorkTask task = null; int taskCount; lock (workQueue) { taskCount = workQueue.Count; if (taskCount > 0) { task = workQueue.Dequeue(); } else { Monitor.Wait(workQueue); continue; } } try { task.Run(); } catch (Exception err) { log.Error(err.Message, err); } } } catch (Exception err) { log.Error(err.Message, err); } } private void internal_queue_task(WorkTask task) { lock (workQueue) { workQueue.Enqueue(task); Monitor.Pulse(workQueue); } } public Future Execute(Action command) { WorkTask task = new WorkTask(this, command); internal_queue_task(task); return task; } public Future Schedule(Action r, int delay) { return new ScheduledWorkTask(this, r, delay, System.Threading.Timeout.Infinite); } public Future ScheduleAtFixedRate(Action r, int initial, int period) { return new ScheduledWorkTask(this, r, initial, period); } private class WorkTask : Future { internal readonly ThreadExecutor exec; internal readonly long id; internal readonly Action action; internal bool is_cancel = false; public WorkTask(ThreadExecutor exec, Action action) { this.exec = exec; this.id = exec.idgen.IncrementAndGet(); this.action = action; } public bool Cancel() { lock (this) { this.is_cancel = true; } return is_cancel; } public long ID { get { return id; } } public bool IsCancelled { get { lock (this) { return is_cancel; } } } internal virtual void Run() { lock (this) { if (is_cancel) { return; } } action(); } } private class ScheduledWorkTask : WorkTask { internal readonly int initial; internal readonly int period; internal Timer timer; public ScheduledWorkTask(ThreadExecutor exec, Action command, int initial, int period) : base(exec, command) { this.initial = initial; this.period = period; this.timer = new Timer(timeout, command, initial, period); } private void timeout(object state) { lock (this) { if (is_cancel) { this.timer.Dispose(); return; } if (period == Timeout.Infinite) { this.timer.Dispose(); } } exec.internal_queue_task(this); } } } public class SingleThreadExecutor : ThreadExecutor { public SingleThreadExecutor() : base(1) { } } }