본문 바로가기
C++ 200제/코딩 IT 정보

C# 비동기 Async, 소켓 서버, 함수 소스 (Socket network)

by vicddory 2017. 8. 18.

C# 비동기 Async, 소켓 서버, 함수 소스 (Socket network)


C# 비동기 소켓 서버를 포함한 비동기 함수, 비동기 서버, 비동기 쓰기, 비동기 읽기 등의 소스입니다. 전체 소스(Visualstudio)와 링크는 아래를 참조하시고, 전체 소스는 본문을 참조하세요.



C# 비동기 Async, 소켓 서버, 함수 소스[C# 비동기 Async, 소켓 서버, 함수 소스 (Socket network)] 프로젝트 실행화면

1. Blocking Socker Server


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
using System;
using System.Threading;
using System.Net;
using System.Net.Sockets;
 
public class Server
{
 public void Serve (IPAddress address, int port)
 {
    ThreadPool.SetMinThreads (5050);    // Refer to Chapter 21
    ThreadPool.SetMaxThreads (5050);    // Refer to Chapter 21
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      ThreadPool.QueueUserWorkItem (Accept, c);
    }
  }
 
  void Accept (object clientObject)
  {
    using (TcpClient client = (TcpClient) clientObject)
    using (NetworkStream n = client.GetStream())
    {
      byte[] data = new byte [5000];
 
      int bytesRead = 0int chunkSize = 1;
      while (bytesRead < data.Length && chunkSize > 0)
        bytesRead +=
          chunkSize = n.Read
            (data, bytesRead, data.Length - bytesRead);    // BLOCKS
 
      Array.Reverse (data);
      n.Write (data, 0, data.Length);                      // BLOCKS
    }
  }
}
cs


2. Non-blocking sockets server


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public class Server
{
  public void Serve (IPAddress address, int port)
  {
    ThreadPool.SetMinThreads (5050);
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      ThreadPool.QueueUserWorkItem (ReverseEcho, c);
    }
  }
 
  void ReverseEcho (object client)
  {
    new ReverseEcho().Begin ((TcpClient)client);
  }
}
 
class ReverseEcho
{
  TcpClient _client;
  NetworkStream _stream;
  byte[] _data = new byte [5000];
  int _bytesRead = 0;
 
  internal void Begin (TcpClient c)
  {
    try
    {
      _client = c;
      _stream = c.GetStream();
      Read();
    }
    catch (Exception ex) { ProcessException (ex); }
  }
 
  void Read()            // Read in a nonblocking fashion.
  {
    while (true)
    {
      IAsyncResult r = _stream.BeginRead
       (_data, _bytesRead, _data.Length - _bytesRead, ReadCallback, null);
 
      // This will nearly always return in the next line:
      if (!r.CompletedSynchronously) return;   // Handled by callback
      if (!EndRead (r)) break;
    }
    Write();
  }
 
  void ReadCallback (IAsyncResult r)
  {
    try
    {
      if (r.CompletedSynchronously) return;
      if (EndRead (r))
      {
        Read();       // More data to read!
        return;
      }
      Write();
    }
    catch (Exception ex) { ProcessException (ex); }
  }
 
  bool EndRead (IAsyncResult r)   // Returns false if there’s no more data
  {
    int chunkSize = _stream.EndRead (r);
    _bytesRead += chunkSize;
    return chunkSize > 0 && _bytesRead < _data.Length;   // More to read
  }
 
  void Write()
  {
    Array.Reverse (_data);
    _stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null);
  }
 
  void WriteCallback (IAsyncResult r)
  {
    try { _stream.EndWrite (r); }
    catch (Exception ex) { ProcessException (ex); }
    Cleanup();
  }
 
  void ProcessException (Exception ex)
  {
    Cleanup();
    Console.WriteLine ("Error: " + ex.Message);
  }
 
  void Cleanup()
  {
    if (_stream != null) _stream.Close();
    if (_client != null) _client.Close();
  }
}
cs

3. Non-blocking server: with Tasks


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
public class Server
{
  public void Serve (IPAddress address, int port)
  {
    ThreadPool.SetMinThreads (5050);
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      new ReverseEcho().BeginAsync (c);
    }
  }
}
 
class ReverseEcho
{
  TcpClient _client;
  NetworkStream _stream;
  byte[] _data = new byte [5000];
  int _bytesRead = 0;
 
  internal void BeginAsync (TcpClient c)
  {
    _client = c;
    _stream = c.GetStream();
 
    var task = Task.Factory.StartNew (Read);
 
    // Set up centralized error handling and cleanup:
 
    task.ContinueWith (ant => 
      Console.WriteLine ("Error: " + ant.Exception.Message),
      TaskContinuationOptions.OnlyOnFaulted);                
 
    task.ContinueWith (ant =>
    {
      if (_stream != null) _stream.Close();
      if (_client != null) _client.Close();
    });
  }
 
  void Read()    // This will create a child task.
  {
    Task<int> readChunk = Task<int>.Factory.FromAsync (
      _stream.BeginRead, _stream.EndRead,
      _data, _bytesRead, _data.Length - _bytesRead, null,
      TaskCreationOptions.AttachedToParent);
 
    readChunk.ContinueWith (Write, TaskContinuationOptions.NotOnFaulted 
                                 | TaskCreationOptions.AttachedToParent);
  }
 
  void Write (Task<int> readChunk)
  {
    _bytesRead += readChunk.Result;
    if (readChunk.Result > 0 && _bytesRead < _data.Length)
    {
      Read();       // More data to read!
      return;
    }
    Array.Reverse (_data);
    Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite,
                            _data, 0, _data.Length, null,
                            TaskCreationOptions.AttachedToParent);
  }
}
cs

4. Async methods and iterators


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Server
{
  public void Serve (IPAddress address, int port)
  {
    ThreadPool.SetMinThreads (5050);
    TcpListener listener = new TcpListener (address, port);
    listener.Start();
    while (true)
    {
      TcpClient c = listener.AcceptTcpClient();
      Task.Factory.Iterate (ReverseEcho(c)).ContinueWith (t =>
        Console.WriteLine ("Error: " + t.Exception.Message),
        TaskContinuationOptions.OnlyOnFaulted);
    }
  }
 
  IEnumerable<Task> ReverseEcho (TcpClient client)
  {
    using (client)
    using (var stream = client.GetStream())
    {
      byte[] data = new byte[Program.MessageLength];
      int bytesRead = 0;
      while (true)
      {
        // ReadASync is an extension method in the samples.
        Task<int> readChunk = stream.ReadAsync
          (data, bytesRead, data.Length - bytesRead);
        yield return readChunk;
        bytesRead += readChunk.Result;
        if (readChunk.Result <= 0 || bytesRead >= data.Length)
          break;
      }
      Array.Reverse(data);
      yield return stream.WriteAsync (data, 0, bytesRead);
    }
  }
}
cs


5. Writing async methods


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class MessagingServices
{
  public static IAsyncResult BeginReverseEcho (TcpClient client,
                                               AsyncCallback callback,
                                               object userState)
  {
    var re = new ReverseEcho();
    re.Begin (client, callback, userState);
    return re;
  }
 
  public static byte[] EndReverseEcho (IAsyncResult r)
  {
    return ((ReverseEcho)r).End();
  }
}
 
class ReverseEcho : IAsyncResult
{
  TcpClient _client;
  NetworkStream _stream;
  object _userState;
  ManualResetEvent _waitHandle = new ManualResetEvent (false);
  int _bytesRead = 0;
  byte[] _data = new byte [5000];
  Exception _exception;
 
  internal ReverseEcho() { }
 
  // IAsyncResult members:
 
  public object AsyncState { get { return _userState; } }
  public WaitHandle AsyncWaitHandle { get { return _waitHandle; } }
  public bool CompletedSynchronously { get { return false; } }
  public bool IsCompleted
  {
    get { return _waitHandle.WaitOne (0false); }
  }
 
  internal void Begin (TcpClient c, AsyncCallback callback, object state)
  {
    _client = c;
    _userState = state;
    _stream = _client.GetStream();
    
    Task.Factory.StartNew (Read).ContinueWith (ant =>
    {
      _exception = ant.Exception;   // In case an exception occurred.
 
      if (_stream != null)
        try { _stream.Close(); }
        catch (Exception ex) { _exception = ex; };
 
      _waitHandle.Set();
 
      if (callback != null) callback (this);
    });
  }
 
  internal byte[] End()     // Wait for completion + rethrow any error.
  {
    AsyncWaitHandle.WaitOne();
    if (_exception != nullthrow _exception;
    return _data;
  }
 
  void Read()
  {
    Task<int> readChunk = Task<int>.Factory.FromAsync (
      _stream.BeginRead, _stream.EndRead,
      _data, _bytesRead, _data.Length - _bytesRead, null);
 
    readChunk.ContinueWith (ContinueRead,
                            TaskContinuationOptions.NotOnFaulted
                          | TaskContinuationOptions.AttachedToParent);
  }
 
  void ContinueRead (Task<int> readChunk)
  {
    _bytesRead += readChunk.Result;
    if (readChunk.Result > 0 && _bytesRead < _data.Length)
    {
      Read();       // More data to read!
      return;
    }
    Array.Reverse (_data);
    Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite,
                            _data, 0, _data.Length, null);
  }  
}
cs


C# 비동기 Async, 소켓 서버, 함수 소스 (Socket network)

댓글