软件信息网 移动端开发 用C#实现线程池

用C#实现线程池

NET 6 环境开发 实现 线程数量,任务队列,非核心线程,及核心线程活跃时间的管理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
namespace CustomThreadPool;
/// <summary>
/// 线程池类
/// </summary>
public class ThreadPoolExecutor
{
    /// <summary>
    /// 核心线程的任务队列
    /// </summary>
    private readonly Queue<WorkTask> tasks = new Queue<WorkTask>();
    /// <summary>
    /// 最大核心线程数
    /// </summary>
    private int coreThreadCount;
    /// <summary>
    /// 最大非核心线程数
    /// </summary>
    private int noneCoreThreadCount;
    /// <summary>
    /// 当前运行的核心线程的数量
    /// </summary>
    private int runCoreThreadCount;
    /// <summary>
    /// 当前运行的非核心线程的数量
    /// </summary>
    private int runNoneCoreThreadCount;
    /// <summary>
    /// 核心线程队列的最大数
    /// </summary>
    private int maxQueueCount;
    /// <summary>
    /// 当核心线程空闲时最大活跃时间
    /// </summary>
    private int keepAliveTimeout;
    /// <summary>
    /// 设置是否为后台线程
    /// </summary>
    private bool isBackground;
    private ThreadPoolExecutor() { }
    /// <summary>
    ///
    /// </summary>
    /// <param name="CoreThreadCount">核心线程数</param>
    /// <param name="TotalThreadCount">总线程数</param>
    /// <param name="IsBackground">是否为后台线程</param>
    /// <param name="QueueCount">核心队列的最大数</param>
    /// <param name="KeepAliveTimeout">当核心线程空闲时最大活跃时间</param>
    /// <exception cref="ArgumentOutOfRangeException"></exception>
    /// <exception cref="ArgumentException"></exception>
    public ThreadPoolExecutor(int CoreThreadCount = 5, int TotalThreadCount = 10, bool IsBackground = trueint QueueCount = 200, int KeepAliveTimeout = 0)
    {
        if (CoreThreadCount < 1) throw new ArgumentOutOfRangeException(nameof(CoreThreadCount), CoreThreadCount, null);
        if (TotalThreadCount < CoreThreadCount) throw new ArgumentException($"{nameof(TotalThreadCount)}:{TotalThreadCount} must be greater than {nameof(CoreThreadCount)}:{CoreThreadCount}");
        if (QueueCount < 0) throw new ArgumentOutOfRangeException(nameof(QueueCount), QueueCount, null);
        if (KeepAliveTimeout < 0) throw new ArgumentOutOfRangeException(nameof(KeepAliveTimeout), KeepAliveTimeout, null);
        coreThreadCount = CoreThreadCount;
        noneCoreThreadCount = TotalThreadCount - CoreThreadCount;
        keepAliveTimeout = KeepAliveTimeout;
        maxQueueCount = QueueCount;
        isBackground = IsBackground;
    }
    /// <summary>
    /// 执行任务
    /// </summary>
    /// <param name="task">一个自定义任务</param>
    /// <exception cref="ArgumentNullException">任务为null时,抛出该错误</exception>
    /// <exception cref="NotSupportedException">当核心任务队列已满且非核心线程最大数为0时抛出该错误</exception>
    public void QueueTask(WorkTask task)
    {
        if (task == nullthrow new ArgumentNullException(nameof(task));
        lock (tasks)
        {
            tasks.Enqueue(task);
            if (tasks.Count <= maxQueueCount)
            {
                if (runCoreThreadCount < coreThreadCount)
                {
                    ++runCoreThreadCount;
                    Run(true);
                }
            }
            else
            {
                if (noneCoreThreadCount > 0 && runNoneCoreThreadCount < noneCoreThreadCount)
                {
                    ++runNoneCoreThreadCount;
                    Run(false);
                }
            }
        }
    }
    private void Run(bool isCore)
    {
        Tuple<intbool> state = new(keepAliveTimeout, isCore);
        Thread thread = new(t => Excute(t))
        {
            Name = Guid.NewGuid().ToString("D"),
            IsBackground = isBackground
        };
        thread.Start(state);
    }
    private void Excute(object? state)
    {
        if (state == nullreturn;
        var parameter = (Tuple<intbool>)state;
        bool first = true;
        DateTime firstTime = DateTime.Now;
        while (true)
        {
            WorkTask? item = null;
            lock (tasks)
            {
                if (tasks.Count > 0)
                {
                    first = true;
                    item = tasks.Dequeue();
                }
                else
                {
                    if (parameter.Item2)
                    {
                        if (first)
                        {
                            firstTime = DateTime.Now;
                            first = false;
                        }
                        if ((DateTime.Now - firstTime).TotalMilliseconds > parameter.Item1)
                        {
                            --runCoreThreadCount;
                            break;
                        }
                    }
                    else
                    {
                        --runNoneCoreThreadCount;
                        break;
                    }
                }
            }
            item?.Runsynchronous();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
namespace CustomThreadPool;
/// <summary>
/// 包装的任务类
/// </summary>
public class WorkTask
{
    public static WorkTaskFactory Factory { getprivate set; } = new WorkTaskFactory();
    /// <summary>
    /// 任务运行结束时触发该事件
    /// </summary>
    public event Action<WorkTask>? TaskCompleted;
    /// <summary>
    /// 任务ID
    /// </summary>
    private static int _id = 0;
    /// <summary>
    /// 委托给任务不带执行参数的代码
    /// </summary>
    private readonly Action? action;
    /// <summary>
    /// 委托给任务执行的带输入参数代码
    /// </summary>
    private readonly Action<object?>? actionWithParamter;
    /// <summary>
    /// 线程间的同步事件
    /// </summary>
    public AutoResetEvent WaitHandle { getprotected set; } = new AutoResetEvent(false);
    /// <summary>
    /// 执行代码的参数
    /// </summary>
    public object? State { getprotected set; }
    /// <summary>
    /// 接收任务抛出的异常
    /// </summary>
    public WorkTaskException? Exception { getprotected set; }
    /// <summary>
    /// 任务是否完成标志
    /// </summary>
    public bool IsCompleted { getprotected set; } = false;
    /// <summary>
    /// 任务知否有异常
    /// </summary>
    public bool IsFaulted { getprotected set; } = false;
    /// <summary>
    /// 任务状态
    /// </summary>
    public WorkTaskStatus Status { getprotected set; } = WorkTaskStatus.Created;
    public int Id { get return Interlocked.Increment(ref _id); } }
    protected WorkTask() { }
    protected void OnTaskCompleted(WorkTask sender)
    {
        TaskCompleted?.Invoke(sender);
    }
    public WorkTask(Action action)
    {
        this.action = action ?? throw new ArgumentNullException(nameof(action));
    }
    public WorkTask(Action<object?> action, object state)
    {
        actionWithParamter = action ?? throw new ArgumentNullException(nameof(action));
        this.State = state;
    }
    /// <summary>
    /// 任务的同步方法
    /// </summary>
    public virtual void Runsynchronous()
    {
        if (Status != WorkTaskStatus.Created) return;
        Status = WorkTaskStatus.Running;
        try
        {
            action?.Invoke();
            actionWithParamter?.Invoke(State);
        }
        catch (Exception ex)
        {
            Exception = new WorkTaskException(ex.Message, ex);
            IsFaulted = true;
        }
        finally
        {
            OnTaskCompleted(this);
            WaitHandle.Set();
            IsCompleted = true;
            Status = WorkTaskStatus.RanToCompleted;
        }
    }
    /// <summary>
    /// 通过调用线程执行的方法
    /// </summary>
    public void Start()
    {
        Factory.ThreadPoolExcutor?.QueueTask(this);
    }
    /// <summary>
    /// 通过调用线程执行的方法
    /// </summary>
    /// <param name="executor">线程池管理类</param>
    public void Start(ThreadPoolExecutor executor)
    {
        executor.QueueTask(this);
    }
    /// <summary>
    /// 执行一组任务并等待所有任务完成。
    /// </summary>
    /// <param name="tasks">一组任务</param>
    /// <returns>所有任务是否都接收到完成的信号。</returns>
    public static bool WaitAll(WorkTask[] tasks)
    {
        var result = true;
        foreach (var task in tasks)
        {
            result = result && task.WaitHandle.WaitOne();
        }
        return result;
    }
    /// <summary>
    /// 执行一组任务并等待任意一个任务完成。
    /// </summary>
    /// <param name="tasks">一组任务</param>
    /// <returns>返回已完成任务的索引</returns>
    public static int WaitAny(WorkTask[] tasks)
    {
        var index = new Random().Next(0, tasks.Length - 1);
        tasks[index].WaitHandle.WaitOne();
        return index;
    }
}
/// <summary>
/// 具有返回类型的任务
/// </summary>
/// <typeparam name="TResult"></typeparam>
public class WorkTask<TResult> : WorkTask
{
    private readonly Func<TResult>? func;
    private readonly Func<object?, TResult>? funcWithParameter;
    protected TResult? _result = default(TResult);
    public TResult? Result
    {
        get
        {
            if (!isSetSignal)
                WaitHandle.WaitOne();
            return _result;
        }
    }
    public WorkTask(Func<TResult> func)
    {
        this.func = func ?? throw new ArgumentNullException(nameof(func));
    }
    public WorkTask(Func<object?, TResult> func, object? state)
    {
        this.funcWithParameter = func ?? throw new ArgumentNullException(nameof(func));
        this.State = state;
    }
    private bool isSetSignal = false;
    public override void Runsynchronous()
    {
        if (Status != WorkTaskStatus.Created) return;
        Status = WorkTaskStatus.Running;
        try
        {
            if (func != null) _result = func();
            if (funcWithParameter != null) _result = funcWithParameter(State);
        }
        catch (Exception ex)
        {
            Exception = new WorkTaskException(ex.Message, ex);
            IsFaulted = true;
        }
        finally
        {
            OnTaskCompleted(this);
            isSetSignal = WaitHandle.Set();
            Status = WorkTaskStatus.RanToCompleted;
            IsCompleted = true;
        }
    }
}
public class WorkTaskException : Exception
{
    public WorkTaskException()
    {
    }
    public WorkTaskException(string Message)
        base(Message)
    {
    }
    public WorkTaskException(string Message, Exception InnerException)
        base(Message, InnerException)
    {
    }
}
public enum WorkTaskStatus
{
    /// <summary>
    /// 已创建
    /// </summary>
    Created = 0,
    /// <summary>
    /// 正在运行
    /// </summary>
    Running = 1,
    /// <summary>
    /// 已完成
    /// </summary>
    RanToCompleted = 2,
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
namespace CustomThreadPool;
public class WorkTaskFactory
{
    public ThreadPoolExecutor? ThreadPoolExcutor { getprivate set; }
    public WorkTaskFactory(ThreadPoolExecutor excutor)
    {
        ThreadPoolExcutor = excutor;
    }
    public WorkTaskFactory()
    this(new ThreadPoolExecutor(5, 10))
    {
    }
    public WorkTask StartNew(Action action, ThreadPoolExecutor? executor = null)
    {
        WorkTask task = new WorkTask(action);
        ThreadPoolExcutor = executor ?? ThreadPoolExcutor;
        ThreadPoolExcutor?.QueueTask(task);
        return task;
    }
    public WorkTask<TResult> StartNew<TResult>(Func<object?, TResult> func, object? state, ThreadPoolExecutor? executor = null)
    {
        WorkTask<TResult> task = new WorkTask<TResult>(func, state);
        ThreadPoolExcutor = executor ?? ThreadPoolExcutor;
        ThreadPoolExcutor?.QueueTask(task);
        return task;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
namespace CustomThreadPool;
using System.Threading;
using System.Text;
using System;
using System.Diagnostics;
using System.Reflection.Emit;
class Program
{
    static void Main(string[] args)
    {
        int count = 5;
        ThreadPoolExecutor poolExcutor = new(5, 6, QueueCount: 5, KeepAliveTimeout: 2000);
        WorkTask<int?>[] workTasks = new WorkTask<int?>[count];
        for (int i = 0; i < count; i++) workTasks[i] = WorkTask.Factory.StartNew(t => Action(t), state: i, executor: poolExcutor);
        WorkTask<int> task = WorkTask.Factory.StartNew(t =>
        {
            Thread.Sleep(100);
            Console.WriteLine("start thread");
            return 100;
        }, state: null, executor: poolExcutor);
        Console.WriteLine("start main");
        WorkTask.WaitAll(workTasks);
        Console.WriteLine(task.Result);
        Console.WriteLine(workTasks.Sum(t => t.Result));
    }
    private static int? Action(object? t)
    {
        Thread.Sleep(2000);
        Console.WriteLine($"Task Id:{Environment.CurrentManagedThreadId},Parameter:{t}");
        return t == null default(int?) : (int)t + 1;
    }
}

调用结果

 

作者: 软件定制开发

李铁牛,一直致力于企业客户软件定制开发,计算机专业毕业后,一直从事于互联网产品开发到现在。系统开发,系统源码:15889726201
上一篇
下一篇
联系我们

联系我们

15889726201

在线咨询: QQ交谈

邮箱: 187395037@qq.com

工作时间:周一至周五,9:00-17:30,节假日休息

关注微信
微信扫一扫关注我们

微信扫一扫关注我们

关注微博
返回顶部