티스토리 뷰
목차
반응형
C# 비동기 Async, 소켓 서버, 함수 소스 (Socket network)
C# 비동기 소켓 서버를 포함한 비동기 함수, 비동기 서버, 비동기 쓰기, 비동기 읽기 등의 소스입니다. 전체 소스(Visualstudio)와 링크는 아래를 참조하시고, 전체 소스는 본문을 참조하세요.
[C# 비동기 Async, 소켓 서버, 함수 소스 (Socket network)] 프로젝트 실행화면
1. Blocking Socker Server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | using System; using System.Threading; using System.Net; using System.Net.Sockets; public class Server { public void Serve (IPAddress address, int port) { ThreadPool.SetMinThreads (50, 50); // Refer to Chapter 21 ThreadPool.SetMaxThreads (50, 50); // Refer to Chapter 21 TcpListener listener = new TcpListener (address, port); listener.Start(); while (true) { TcpClient c = listener.AcceptTcpClient(); ThreadPool.QueueUserWorkItem (Accept, c); } } void Accept (object clientObject) { using (TcpClient client = (TcpClient) clientObject) using (NetworkStream n = client.GetStream()) { byte[] data = new byte [5000]; int bytesRead = 0; int chunkSize = 1; while (bytesRead < data.Length && chunkSize > 0) bytesRead += chunkSize = n.Read (data, bytesRead, data.Length - bytesRead); // BLOCKS Array.Reverse (data); n.Write (data, 0, data.Length); // BLOCKS } } } | cs |
2. Non-blocking sockets server
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 | public class Server { public void Serve (IPAddress address, int port) { ThreadPool.SetMinThreads (50, 50); TcpListener listener = new TcpListener (address, port); listener.Start(); while (true) { TcpClient c = listener.AcceptTcpClient(); ThreadPool.QueueUserWorkItem (ReverseEcho, c); } } void ReverseEcho (object client) { new ReverseEcho().Begin ((TcpClient)client); } } class ReverseEcho { TcpClient _client; NetworkStream _stream; byte[] _data = new byte [5000]; int _bytesRead = 0; internal void Begin (TcpClient c) { try { _client = c; _stream = c.GetStream(); Read(); } catch (Exception ex) { ProcessException (ex); } } void Read() // Read in a nonblocking fashion. { while (true) { IAsyncResult r = _stream.BeginRead (_data, _bytesRead, _data.Length - _bytesRead, ReadCallback, null); // This will nearly always return in the next line: if (!r.CompletedSynchronously) return; // Handled by callback if (!EndRead (r)) break; } Write(); } void ReadCallback (IAsyncResult r) { try { if (r.CompletedSynchronously) return; if (EndRead (r)) { Read(); // More data to read! return; } Write(); } catch (Exception ex) { ProcessException (ex); } } bool EndRead (IAsyncResult r) // Returns false if there’s no more data { int chunkSize = _stream.EndRead (r); _bytesRead += chunkSize; return chunkSize > 0 && _bytesRead < _data.Length; // More to read } void Write() { Array.Reverse (_data); _stream.BeginWrite (_data, 0, _data.Length, WriteCallback, null); } void WriteCallback (IAsyncResult r) { try { _stream.EndWrite (r); } catch (Exception ex) { ProcessException (ex); } Cleanup(); } void ProcessException (Exception ex) { Cleanup(); Console.WriteLine ("Error: " + ex.Message); } void Cleanup() { if (_stream != null) _stream.Close(); if (_client != null) _client.Close(); } } | cs |
3. Non-blocking server: with Tasks
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 | public class Server { public void Serve (IPAddress address, int port) { ThreadPool.SetMinThreads (50, 50); TcpListener listener = new TcpListener (address, port); listener.Start(); while (true) { TcpClient c = listener.AcceptTcpClient(); new ReverseEcho().BeginAsync (c); } } } class ReverseEcho { TcpClient _client; NetworkStream _stream; byte[] _data = new byte [5000]; int _bytesRead = 0; internal void BeginAsync (TcpClient c) { _client = c; _stream = c.GetStream(); var task = Task.Factory.StartNew (Read); // Set up centralized error handling and cleanup: task.ContinueWith (ant => Console.WriteLine ("Error: " + ant.Exception.Message), TaskContinuationOptions.OnlyOnFaulted); task.ContinueWith (ant => { if (_stream != null) _stream.Close(); if (_client != null) _client.Close(); }); } void Read() // This will create a child task. { Task<int> readChunk = Task<int>.Factory.FromAsync ( _stream.BeginRead, _stream.EndRead, _data, _bytesRead, _data.Length - _bytesRead, null, TaskCreationOptions.AttachedToParent); readChunk.ContinueWith (Write, TaskContinuationOptions.NotOnFaulted | TaskCreationOptions.AttachedToParent); } void Write (Task<int> readChunk) { _bytesRead += readChunk.Result; if (readChunk.Result > 0 && _bytesRead < _data.Length) { Read(); // More data to read! return; } Array.Reverse (_data); Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite, _data, 0, _data.Length, null, TaskCreationOptions.AttachedToParent); } } | cs |
4. Async methods and iterators
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 | public class Server { public void Serve (IPAddress address, int port) { ThreadPool.SetMinThreads (50, 50); TcpListener listener = new TcpListener (address, port); listener.Start(); while (true) { TcpClient c = listener.AcceptTcpClient(); Task.Factory.Iterate (ReverseEcho(c)).ContinueWith (t => Console.WriteLine ("Error: " + t.Exception.Message), TaskContinuationOptions.OnlyOnFaulted); } } IEnumerable<Task> ReverseEcho (TcpClient client) { using (client) using (var stream = client.GetStream()) { byte[] data = new byte[Program.MessageLength]; int bytesRead = 0; while (true) { // ReadASync is an extension method in the samples. Task<int> readChunk = stream.ReadAsync (data, bytesRead, data.Length - bytesRead); yield return readChunk; bytesRead += readChunk.Result; if (readChunk.Result <= 0 || bytesRead >= data.Length) break; } Array.Reverse(data); yield return stream.WriteAsync (data, 0, bytesRead); } } } | cs |
5. Writing async methods
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 | public class MessagingServices { public static IAsyncResult BeginReverseEcho (TcpClient client, AsyncCallback callback, object userState) { var re = new ReverseEcho(); re.Begin (client, callback, userState); return re; } public static byte[] EndReverseEcho (IAsyncResult r) { return ((ReverseEcho)r).End(); } } class ReverseEcho : IAsyncResult { TcpClient _client; NetworkStream _stream; object _userState; ManualResetEvent _waitHandle = new ManualResetEvent (false); int _bytesRead = 0; byte[] _data = new byte [5000]; Exception _exception; internal ReverseEcho() { } // IAsyncResult members: public object AsyncState { get { return _userState; } } public WaitHandle AsyncWaitHandle { get { return _waitHandle; } } public bool CompletedSynchronously { get { return false; } } public bool IsCompleted { get { return _waitHandle.WaitOne (0, false); } } internal void Begin (TcpClient c, AsyncCallback callback, object state) { _client = c; _userState = state; _stream = _client.GetStream(); Task.Factory.StartNew (Read).ContinueWith (ant => { _exception = ant.Exception; // In case an exception occurred. if (_stream != null) try { _stream.Close(); } catch (Exception ex) { _exception = ex; }; _waitHandle.Set(); if (callback != null) callback (this); }); } internal byte[] End() // Wait for completion + rethrow any error. { AsyncWaitHandle.WaitOne(); if (_exception != null) throw _exception; return _data; } void Read() { Task<int> readChunk = Task<int>.Factory.FromAsync ( _stream.BeginRead, _stream.EndRead, _data, _bytesRead, _data.Length - _bytesRead, null); readChunk.ContinueWith (ContinueRead, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent); } void ContinueRead (Task<int> readChunk) { _bytesRead += readChunk.Result; if (readChunk.Result > 0 && _bytesRead < _data.Length) { Read(); // More data to read! return; } Array.Reverse (_data); Task.Factory.FromAsync (_stream.BeginWrite, _stream.EndWrite, _data, 0, _data.Length, null); } } | cs |
C# 비동기 Async, 소켓 서버, 함수 소스 (Socket network)
반응형