Akka(43): Http:SSE-Server Sent Event – 服务端主推消息

     3.HTTP模消息标头解析:

         
在HTTP中,请求与应消息,以及消息内容我,都足以使称为标头的额外字段,包含重复多的音信。

       (1).标头分类:

标头名称 描述 HTTP模型标头容器类
User-Agent 为请求提供扩展信息,描述产生这个请求的应用程序 HttpRequestHeaders
Server 为响应提供关于源服务器软件的扩展信息 HttpResponseHeaders
Content-Type 定义请求或响应有效载荷正文中,资源表示使用的媒体类型 HttpContentHeaders

       (2).HttpHeaders抽象类分析:

名称 描述
Add 添加指定的标头及其值到 HttpHeaders 集合中。
TryAddWithoutValidation 返回一个值,该值指示指定标头及其值是否已添加到HttpHeaders 集合,而未验证所提供的信息。
Clear 从 HttpHeaders 集合中移除所有标头。
Remove 从HttpHeaders集合中移除指定的标头。
GetValues 返回存储在HttpHeaders 集合中所有指定标头的标头值。
Contains 如果指定标头存在于 HttpHeaders 集合则返回。
ToString 返回表示当前 HttpHeaders对象的字符串。

     
 HttpHeaders是一个抽象类,HttpRequestHeaders、HttpResponseHeaders、HttpContentHeaders三只类似继承了此类。接下来我们来打探一下Add()方法:

[__DynamicallyInvokable]
public void Add(string name, string value)
{
    HeaderStoreItemInfo info;
    bool flag;
    this.CheckHeaderName(name);
    this.PrepareHeaderInfoForAdd(name, out info, out flag);
    this.ParseAndAddValue(name, info, value);
    if (flag && (info.ParsedValue != null))
    {
        this.AddHeaderToStore(name, info);
    }
}

     
 Add()方法有两单重载版本,该方式可为容器添加标头,如果只要添加的标头有标准名,在添加前面标头值会进展说明。Add方法还会见证明标头是否可产生多只价。

 

   1..NET4.5事先版本创建HTTP POST请求实例:

        public static string HttpPost(string postUrl, string postData)
        {
            if (string.IsNullOrEmpty(postUrl))
                throw new ArgumentNullException(postUrl);
            if (string.IsNullOrEmpty(postData))
                throw new ArgumentNullException(postData);
            var request = WebRequest.Create(postUrl) as HttpWebRequest;
            if (request == null)
                throw new ArgumentNullException("postUrl");
            try
            {
                var cookieContainer = new CookieContainer();
                request.CookieContainer = cookieContainer;
                request.AllowAutoRedirect = true;
                request.Method = "POST";
                request.ContentType = "application/x-www-form-urlencoded";
                var data = Encoding.UTF8.GetBytes(postData);
                request.ContentLength = data.Length;
                var outstream = request.GetRequestStream();
                outstream.Write(data, 0, data.Length);
                outstream.Close();
                //发送请求并获取相应回应数据,获取对应HTTP请求的响应
                var response = request.GetResponse() as HttpWebResponse;
                if (response != null)
                {
                    var instream = response.GetResponseStream();
                    var content = string.Empty;
                    if (instream == null)
                    {
                        return content;
                    }
                    using (var sr = new StreamReader(instream, Encoding.UTF8))
                    {
                        content = sr.ReadToEnd();
                    }
                    return content;
                }
            }
            catch (ArgumentException arex)
            {
                throw arex;
            }
            catch (IOException ioex)
            {
                throw ioex;
            }
            return null;
        }

 

   对于ASP.NET
WebAPI的优势和特点,在此地虽未开腔了,需要使用的本就是见面挑选,也未需要我浪费篇幅去教授这些,这首博文主要教授ASP.NET
WebAPI中的HTTP消息之布局和拍卖消息之为主目标。

劳务端的SSE发布是以Source[ServerSentEvent,NotUsed]来促成的。ServerSentEvent类型定义如下:

    1.HttpRequestMessage对象解析:

         (1).HttpRequestMessage主要性能与法概述:

名称 说明
Version 获取或设置 HTTP 消息版本
Content 获取或设置 HTTP 消息的内容
Method 获取或设置 HTTP 请求信息使用的 HTTP 方法
RequestUri 获取或设置 HTTP 请求的 Uri
Headers 获取 HTTP 请求标头的集合
Properties 获取 HTTP 请求的属性集
ToString 返回表示当前对象的字符串

        该对象要用以表示 HTTP
请求消息。对于拖欠目标的这些性与办法,大部分应有都未见面生,因为一个HTTP消息被最主要涵盖头部、消息内容等等,在此主要介绍一个特性Properties,该属性并无属其他正式的HTTP消息,当消息传时,不见面保留该属性。

         (2).Properties属性解析:

[__DynamicallyInvokable]
public IDictionary<string, object> Properties
{
    [__DynamicallyInvokable]
    get
    {
        if (this.properties == null)
        {
            this.properties = new Dictionary<string, object>();
        }
        return this.properties;
    }
}

   
有上述的代码可以生肯定的看出该属性只来一个单念属性,并返回一个IDictionary<string,
object>。当消息于服务器或者客户端本地开展拍卖时,该属性用于保存附加的信息信息。该属性只是一个通用的容器,保存本地信息属性。(与接受信息之连续相关的客户端认证;将信息和布局路由于进行匹配,得到的路由数据)

服务端:

   2.HttpResponseMessage靶解析:

        (1).HttpRequestMessage主要性能与艺术概述:

名称 说明
EnsureSuccessStatusCode 如果 HTTP 响应的 IsSuccessStatusCode 属性为  false, 将引发异常
StatusCode 获取或设置 HTTP 响应的状态代码
ReasonPhrase 获取或设置服务器与状态代码通常一起发送的原因短语
RequestMessage 获取或设置导致此响应消息的请求消息
IsSuccessStatusCode 获取一个值,该值指示 HTTP 响应是否成功

     
对于拖欠目标的局部性能没有列举,因为在HttpRequestMessage对象都介绍,如:Version、Content、Headers等,该目标主要用来表示
HTTP 响应消息。在这里要介绍StatusCode属性。

       (2).StatusCode属性:

[__DynamicallyInvokable]
public HttpStatusCode StatusCode
{
    [__DynamicallyInvokable, TargetedPatchingOptOut("Performance critical to inline this type of method across NGen image boundaries")]
    get
    {
        return this.statusCode;
    }
    [__DynamicallyInvokable]
    set
    {
        if ((value < ((HttpStatusCode) 0)) || (value > ((HttpStatusCode) 0x3e7)))
        {
            throw new ArgumentOutOfRangeException("value");
        }
        this.CheckDisposed();
        this.statusCode = value;
    }
}

   
 StatusCode属性为枚举属性,该属性可读而写,对于状态码这个定义,很多人口都是较了解之,在HTTP协议被,状态码主要是意味在消息之呼吁于服务器中拍卖的结果,状态有2XX,3XX,4XX,5XX等等,具体表示的义就不再描述。

 

   2.于.NET4.5本子被,处理HTTP的主导目标:

     
(1).在客户端与劳务器端使用相同的类。(HttpRequestMessage和HttpResponseMessage对象吃未含上下文消息,所以可以于服务器和客户端共用。)

     
(2).由于在.NET4.5遭遇引入了TAP(异步任务模型),所以于新的HTTP模型中,处理HTTP请求的法门可采用async和awit实现异步编程。(可以简单快捷之落实异步编程)

   
我们于新老的HTTP编程模型时,会要命爱的意识以新本子的HTTP模型中,无论是编程的难度和代码编写的精简度,已经推行之效率还是大高的。在对Web项目的支付中,我们本着HTTP知识之刺探是不可或缺的,对于ASP.NET的HTTP处理的法则在此地就是非开实际的牵线,网上为产生比较多的篇章可供应阅读与询问。

   
对于ASP.NET的HTTP处理方式的垂询,是自个儿在付出微信公众平台时更学习的,微信公众平台供了对外访问的接口,我们的程序与服务器对微信服务器的接口进行呼吁访问,微信服务器获取HTTP请求后,返回处理结果,本地服务器获取返回结果。这样一个请-响应模式,组成一个对话。对于微信公众平台的开支对多恰好学习.NET的食指的话稍高大(当然就是对立而言),即时开发了特别频繁以此类型的主次的食指(调用第三正值接口的支付)也非自然好好鲜明的掌握之里面的原理,笔者认为对这么的老三在平台的付出,其要的基本部分就是是对HTTP协议的处理,建立请求、获取响应消息和分析消息就三充分步骤,返回的信内容类同也json或者xml,获取响应消息继,主要是本着信息内容之反序列化,获得消息之实业信息,进而以程序中尤为处理。

   
在WeAPI中信息的出及剖析,以及消息之格式都是可以动态的开创同情商,下面我们愈的打听实现即时同样进程的为主目标。

下面是客户端程序的测试运算步骤:

   1.以.NET4.5之前的本中,处理HTTP的中心目标:

     
(1).在客户端:System.Net.HttpWebRequest用于初始化HTTP请求,处理有关的响应; System.Net.HttpWebResponse处理HTTP响应头和多少读取的追寻。

     
(2).在服务器端:System.Web.HttpContext,System.Web.HttpRequest,System.Web.HttpResponse类用当ASP.NET上产文中,代表单个请求和应。System.Net.HttpListenerContext类,提供对HTTP请求与响应对象的拜访。

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.duration.DurationInt
import akka.http.scaladsl.model.sse.ServerSentEvent

object SSEServer {

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()
    Http().bindAndHandle(route, "localhost", 8011)

    scala.io.StdIn.readLine()
    system.terminate()
  }

  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }

    def events =
      path("events") {
        get {
          complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }
        }
      }

    syncRequests ~ events
  }

  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }
}

 
 对于.NET的分布式应用开发,可以供我们挑选的艺和框架比较多,例如webservice,.net
remoting,MSMQ,WCF等等技术。对于这些技术很多人口都无见面生,即时没有尖锐的询问,但是毫无疑问听说过,每种技术还各出优势与适用范围,没有断的高低,只有相对的适用程度。不过可惜了,今天咱们上课的主题不是即时几乎栽技术,今天紧要教授的是ASP.NET
WebAPI。

  private def processToServerSentEvent: ServerSentEvent = {
    Thread.sleep(3000)   //processing delay
    ServerSentEvent(SyncFiles.fileToSync)
  }

   2..NET4.5本创建HTTP POST请求实例:

async static void getResponse(string url)
        {
            using (HttpClient client = new HttpClient())
            {
                using (HttpResponseMessage response = await client.GetAsync(url))
                {
                    using (HttpContent content = response.Content)
                    {
                        string myContent = await content.ReadAsStringAsync();
                    }
                }
            }
        }
        async static void postResponse(string url)
        {
            while (true)
            {
                IEnumerable<KeyValuePair<string, string>> queries = new List<KeyValuePair<string, string>>()
            {
                new KeyValuePair<string, string> ("test","test")
            };
                HttpContent q = new FormUrlEncodedContent(queries);
                using (HttpClient client = new HttpClient())
                {
                    using (HttpResponseMessage response = await client.PostAsync(url, q))
                    {
                        using (HttpContent content = response.Content)
                        {
                            string myContent = await content.ReadAsStringAsync();

                            Console.WriteLine(myContent);
                        }
                    }
                }
            }
        }

以此类别的参数代表事件信息的数据结构。用户可根据实际用充分利用这个数据结构来传递信息。服务端是由此complete以SeverSentEvent类为元素的Source来开展SSE的,如下:

二.WebAPI的HTTP消息分析:

     
HTTP协议的做事章程是于客户端和服务器之间交换请求与应消息,那么就为尽管足以证实HTTP的核心就是信息,对于“消息”的打听,我们若掌握消息分为“消息头部”和“消息内容”,我们连下的对新HTTP编程模型的牵线的关键性虽是“消息头部”和“消息内容”。

     
在命名空间System.Net.Http中,具有两独核心目标:HttpRequestMessage和HttpResponseMessage。两单目标的组织使下图:

葡京国际平台 1

     
以上重点教学了HttpRequestMessage对象及HttpResponseMessage对象涵盖的要内容,请求与应消息还足以蕴涵一个可选的音正文,两蒙信息类型及消息内容,都好运用应的标头。接下来具体了解有音的构造。

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.client.RequestBuilding.Get
import akka.http.scaladsl.model.HttpMethods
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model._

object SSEClient {

  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }

  def main(args: Array[String]): Unit = {
    implicit val system = ActorSystem()
    implicit val mat    = ActorMaterializer()

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))

    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }


    scala.io.StdIn.readLine()
    system.terminate()
  }
}

三.DotNet中初老HTTP模型分析:

 

一.WebAPI的HTTP概述:

 
 有关HTTP协议的相干内容在此地就是非举行牵线,在作者前面的博文中已经举行过介绍,现在供一下地方,因为过多的废话就是浪费时间,我哪怕聊看即篇博文的读者都对HTTP协议及WebAPI都负有了解。博文地址:

http://www.cnblogs.com/pengze0902/p/5976388.html

http://www.cnblogs.com/pengze0902/p/6224792.html

http://www.cnblogs.com/pengze0902/p/6230105.html

 

四.总结:

 
 以上主要教学了.NET4.5之前和今后版本对HTTP编程模式之片段内容, 两者的重点分在于.NET4.5本之前的HTTP编程模型会区分客户端和服务器,两者采用的目标是不同,实现之规律上虽有一定之相似性,但是利用的近乎可差。.NET4.5后头的版本被,对象的以无客户端和服务器的分,两者可以共用。

    import akka.http.scaladsl.unmarshalling.sse.EventStreamUnmarshalling._
    import system.dispatcher

    Http()
      .singleRequest(Get("http://localhost:8011/events"))
      .flatMap(Unmarshal(_).to[Source[ServerSentEvent, NotUsed]])
      .foreach(_.runForeach(se => downloadFiles(se.data)))

   4.HTTP消息内容分析:

     
在.NET4.5本的HTTP模型中,HTTP消息之正文由抽象基类HttpContent表示,HttpResponseMessage和HttpRequestMessage对象还包含一个HttpContent类型的Content属性。

     (1).HttpContent主要性能和措施:

名称 描述
ReadAsByteArrayAsync 以异步操作将 HTTP 内容写入字节数组。
SerializeToStreamAsync 以异步操作将 HTTP 内容序列化到流。
CopyToAsync 以异步操作将 HTTP 内容写入流。
LoadIntoBufferAsync 以异步操作将 HTTP 内容序列化到内存缓冲区。
CreateContentReadStreamAsync 以异步操作将 HTTP 内容写入内存流。
TryComputeLength 确定 HTTP 内容是否具备有效的字节长度。
Headers 根据 RFC 2616 中的定义,获取内容标头。

     (2).CopyToAsync()方法分析:

[__DynamicallyInvokable]
public Task CopyToAsync(Stream stream, TransportContext context)
{
    Action<Task> continuation = null;
    this.CheckDisposed();
    if (stream == null)
    {
        throw new ArgumentNullException("stream");
    }
    TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
    try
    {
        Task task = null;
        if (this.IsBuffered)
        {
            task = Task.Factory.FromAsync<byte[], int, int>(new Func<byte[], int, int, 
            AsyncCallback, object, IAsyncResult>(stream.BeginWrite), new Action<IAsyncResult>(stream.EndWrite), 
       this.bufferedContent.GetBuffer(), 0, (int) this.bufferedContent.Length, null);
        }
        else
        {
            task = this.SerializeToStreamAsync(stream, context);
            this.CheckTaskNotNull(task);
        }
        if (continuation == null)
        {
            continuation = delegate (Task copyTask) {
                if (copyTask.IsFaulted)
                {
                    tcs.TrySetException(GetStreamCopyException(copyTask.Exception.GetBaseException()));
                }
                else if (copyTask.IsCanceled)
                {
                    tcs.TrySetCanceled();
                }
                else
                {
                    tcs.TrySetResult(null);
                }
            };
        }
        task.ContinueWithStandard(continuation);
    }
    catch (IOException exception)
    {
        tcs.TrySetException(GetStreamCopyException(exception));
    }
    catch (ObjectDisposedException exception2)
    {
        tcs.TrySetException(GetStreamCopyException(exception2));
    }
    return tcs.Task;
}

   
在使信息内容时,需要用HtppContent的法子还是扩展方法。在HttpContent中使CopyToAsync()方法以推送方式访原本之信息内容,由方代码可以观看,该方式接受两单参数,一个凡是流动对象,一个凡有关传输的音信(例如,通道绑定),此参数可以啊
null。该方式好把信内容写副到这流中。

    在拖欠方式的贯彻代码中
创建了一个TaskCompletionSource<object>的泛型对象,该目标表示不绑定到委托的 Task<TResult> 的制造者方,并由此 Task 属性提供针对性使用者正在的拜访。SerializeToStreamAsync方法以盛传的流对象序列化,该法为异步方法。

   
我们要留意的几乎接触,主要也委托的创立同运用,在C#遭,尽量使有.NET提供的委托类,不要自己失去创造。还有一些虽是以先后中针对生的处理方式,异常的破获具有层次性,并且调用了打定义的一个好处理措施TrySetException。

    (2).ReadAsStreamAsync()方法分析:

     
在得到旧消息内容时,除了调用上面介绍的点子外,还可调用ReadAsStreamAsync()方法以拉取的章程访原本之音讯内容。

     
在HttpContent中含有另外两单近乎的艺术,ReadAsStringAsync()和ReadAsByteArrayAsync()异步的提供信息内容之缓冲副本,ReadAsByteArrayAsync()返回原的字节内容,ReadAsStringAsync()将内容解码为字符串返回。

/**
 * Representation of a server-sent event. According to the specification, an empty data field designates an event
 * which is to be ignored which is useful for heartbeats.
 *
 * @param data data, may span multiple lines
 * @param eventType optional type, must not contain \n or \r
 * @param id optional id, must not contain \n or \r
 * @param retry optional reconnection delay in milliseconds
 */
final case class ServerSentEvent(
  data:      String,
  eventType: Option[String] = None,
  id:        Option[String] = None,
  retry:     Option[Int]    = None) {...}

 
 因为自身了解Akka-http的要害目的不是以有关Web-Server的编程,而是想实现同仿照系统并的api,所以啊急需考虑由劳务端主动向客户端发送指令的以场景。比如一个零售店管理平台的服务端在完成了某些数据更新后需通知各零售门市客户端下载最新数据。虽然Akka-http也提供对websocket合计的支持,但websocket的网络连接是双向恒久的,适合频繁之问答交互式服务端与客户端的交流,消息结构吧比较零碎。而我们面临的可能是批次型的恢宏数据库数据交换,只待简单的服务端单向信息就是执行了,所以websocket不绝对劲,而Akka-http的SSE应该于可我们的要求。SSE模式之基本原理是服务端统一集中揭晓消息,各客户端持久订阅服务端发布的信并于信之情节遭筛选产生属于自己当推行之通令,然后进行相应的处理。客户端接收SSE是当一个独门的线程里持续进行的,不见面潜移默化客户端当前之演算流程。当接到有效的信息后哪怕见面调用一个事情功能函数作为后台异步运算任务。

客户端订阅SSE的主意如下:

 

    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._
         complete {
            Source
              .tick(2.seconds, 2.seconds, NotUsed)
              .map( _ => processToServerSentEvent)
              .keepAlive(1.second, () => ServerSentEvent.heartbeat)
          }

运算结果葡京国际平台:

 

  def downloadFiles(file: String) = {
    Thread.sleep(3000)   //process delay
    if (file != "")
      println(s"Try to download $file")
  }

于客户端收到SSE后就是运行downloadFiles(filename)函数。downloadFiles函数定义:

 我之博客即将同到腾讯云+社区。邀大家一同入驻http://cloud.tencent.com/developer/support-plan

上述代码代表劳务端定时运算processToServerSentEvent返回ServerSentEvent类型结果后发布给有订阅的客户端。我们因此一个函数processToServerSentEvent模拟重复运算的事务职能:

 

  object SyncFiles {
    var fileToSync: String = ""
  }
  private def route = {
    import Directives._
    import akka.http.scaladsl.marshalling.sse.EventStreamMarshalling._

    def syncRequests =
      pathPrefix("sync") {
        pathSingleSlash {
        post {
            parameter("file") { filename =>
              complete {
                SyncFiles.fileToSync = filename
                s"set download file to : $filename"
              }
            }
          }
        }
      }

脚是本次讨论的示范源代码:

 

 

客户端:

do some thing ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:50:52 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Orders),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders

do some other things ...
HttpResponse(200 OK,List(Server: akka-http/10.0.10, Date: Fri, 15 Dec 2017 05:51:02 GMT),HttpEntity.Strict(text/plain; charset=UTF-8,set download file to : Items),HttpProtocol(HTTP/1.1))
Try to download Orders
Try to download Orders
Try to download Items
Try to download Items

Try to download Items

Process finished with exit code 0

本条函数模拟发布事件数量是某种业务运算结果,在这里表示客户端需要下载文件名称。我们所以客户端request来学设定是文件名称:

 

    scala.io.StdIn.readLine()
    println("do some thing ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Orders")
    ).onSuccess {
      case msg => println(msg)
    }

    scala.io.StdIn.readLine()
    println("do some other things ...")
    Http().singleRequest(
      HttpRequest(method=HttpMethods.POST,uri = "http://localhost:8011/sync/?file=Items")
    ).onSuccess {
      case msg => println(msg)
    }