檢視原始碼 如何為 Erlang 分散式處理實作替代傳輸載體

本節說明如何為 Erlang 分散式處理實作替代傳輸協定。分散式處理通常透過 TCP/IP 進行。此處解釋一種將 TCP/IP 替換為其他協定的方法。

本節逐步說明 uds_dist 範例應用程式(位於 Kernel 應用程式的 examples 目錄中)。uds_dist 應用程式實作透過 Unix 網域套接字的分散式處理,並且是為 Sun Solaris 2 作業環境編寫的。但是,這些機制是通用的,適用於 Erlang 執行的任何作業系統。C 程式碼不具可移植性的原因,純粹是為了提高可讀性。

簡介

要為 Erlang 分散式處理實作新的傳輸載體,主要步驟如下。

注意

從 ERTS 版本 10.0 開始,引入了對分散式處理控制器程序的支持。也就是說,分散式處理通道上的流量可以由一個程序管理,而不僅僅是一個端口。這使得在 Erlang 程式碼中實作大部分邏輯成為可能,您甚至可能不需要為協定編寫新的驅動程式。一個例子可能是使用 gen_udp 透過 UDP 進行 Erlang 分散式處理(在此範例中,您的 Erlang 程式碼當然必須處理重傳等問題)。也就是說,根據您想做的事情,您也許根本不需要實作驅動程式,然後可以跳過以下與驅動程式相關的章節。分散式處理模組章節中描述的 gen_tcp_disterl_uds_dist 範例利用分散式處理控制器程序,如果您想使用分散式處理控制器程序,值得一看。

撰寫 Erlang 驅動程式

首先,必須使協定對 Erlang 機器可用,這涉及編寫 Erlang 驅動程式。無法使用埠程式,必須使用 Erlang 驅動程式。Erlang 驅動程式可以是

  • 靜態連結到模擬器,這是在使用 Erlang 開源發行版時的一種替代方案,或者
  • 動態載入到 Erlang 機器位址空間中,如果要使用 Erlang 的預編譯版本,這是唯一的替代方案

編寫 Erlang 驅動程式並不容易。驅動程式編寫為 Erlang 模擬器在資料傳送到驅動程式時,或驅動程式在檔案描述符上具有任何可用資料時呼叫的一些回呼函式。由於驅動程式回呼常式在 Erlang 機器的主要執行緒中執行,因此回呼函式不能執行任何阻塞活動。回呼僅用於設定檔案描述符以進行等待和/或讀/寫可用資料。所有 I/O 必須是非阻塞的。但是,驅動程式回呼是依序執行的,因此可以在常式內安全地更新全域狀態。

為驅動程式撰寫 Erlang 介面

實作驅動程式後,最好為驅動程式編寫一個 Erlang 介面,以便能夠單獨測試驅動程式的功能。然後,此介面可以由分散式處理模組使用,該模組將涵蓋來自 net_kernel 的協定詳細資訊。

最簡單的方法是模仿 inetinet_tcp 介面,但不需要實作這些模組中的許多功能。在範例應用程式中,僅實作了一些常用的介面,而且它們經過了很大的簡化。

撰寫分散式處理模組

當透過驅動程式和 Erlang 介面模組使協定對 Erlang 可用時,就可以編寫分散式處理模組。分散式處理模組是一個具有明確定義的回呼的模組,很像 gen_server(但是,沒有編譯器支援來檢查回呼)。此模組實作

  • 尋找其他節點的詳細資訊(也就是說,與 epmd 或類似程式通訊)
  • 建立接聽埠(或類似埠)
  • 連線到其他節點
  • 執行交握/Cookie 驗證

但是,有一個實用程式模組 dist_util,它完成了處理交握、Cookie、計時器和計時的大部分繁瑣工作。使用 dist_util 可以更輕鬆地實作分散式處理模組,這在範例應用程式中完成了。

建立開機指令碼

最後一步是建立開機指令碼,以使協定實作在開機時可用。可以透過在所有系統都執行時啟動分散式處理來偵錯實作,但在實際系統中,分散式處理應儘早啟動,因此需要開機指令碼和一些命令列參數。

此步驟也暗示介面和分散式處理模組中的 Erlang 程式碼是以可以在啟動階段執行的方式編寫的。特別是,不能呼叫 application 模組或任何在開機時未載入的模組。也就是說,只能使用 KernelSTDLIB 和應用程式本身。

分散式處理模組

分散式處理模組公開一個 API,net_kernel 會呼叫該 API 以管理與其他節點的連線。模組名稱應具有後綴 _dist

該模組需要建立某種接聽實體(程序或埠)和一個接受器程序,該程序使用接聽實體接受連入連線。對於每個連線,該模組至少需要建立一個連線監管程序,該程序還負責在設定連線時進行交握,以及一個負責透過連線傳輸資料的分散式處理控制器(程序或埠)。分散式處理控制器和連線監管程序應連結在一起,以便在連線關閉時清除它們。

請注意,每個連線都需要恰好一個分散式處理控制器。程序或埠只能作為一個連線的分散式處理控制器。作為分散式處理控制器的註冊無法撤銷。它會一直存在,直到分散式處理控制器終止。分散式處理控制器不應忽略結束訊號。它被允許捕獲結束,但是當收到結束訊號時,它應該自願終止。

分散式處理模組的範例實作可以在 $ERL_TOP/lib/kernel/examples/gen_tcp_dist/src/gen_tcp_dist.erl 中找到。它使用由程序實作的分散式處理控制器,透過 gen_tcp API 實作 TCP/IP 分散式處理。這與使用埠分散式處理控制器的普通 TCP/IP 分散式處理不同。

分散式處理模組的另一個範例實作可以在 $ERL_TOP/lib/kernel/examples/erl_uds_dist/src/erl_uds_dist.erl 中找到。它使用由程序實作的分散式處理控制器,透過 gen_tcp API 實作 Unix 網域套接字分散式處理。與使用 C 編寫的埠驅動程式的原始 uds_dist 範例相比,erl_uds_dist 完全用 Erlang 編寫。

匯出的回呼函式

以下函式是強制性的

  • listen(Name) ->
      {ok, {Listen, Address, Creation}} | {error, Error}
    listen(Name,Host) ->
      {ok, {Listen, Address, Creation}} | {error, Error}

    listen/2 會在啟動分散式處理時呼叫一次,以便接聽連入連線請求。引數 Name 是完整節點名稱中 @ 符號之前的部分。它可以是原子或字串。引數 Host 是完整節點名稱中 @ 符號之後的部分。它始終是字串。

    傳回值包含一個 Listen 控制代碼(稍後會傳遞給 accept/1 回呼)、Address,它是一個 #net_address{} 記錄,其中包含有關節點位址的資訊(#net_address{} 記錄定義在 kernel/include/net_address.hrl 中)和 Creation,它(目前)是一個整數 123

    如果要使用 epmd 進行節點探索,通常需要使用 erl_epmd 模組(kernel 應用程式的一部分),以便向 epmd 註冊接聽埠並檢索要使用的 Creation

  • address() ->
      Address

    address/0 會被呼叫,以在不建立接聽通訊端的情況下取得 listen/2 函式的 Address 部分。除了 address 之外,所有欄位都必須在傳回的記錄中設定

    範例

    address() ->
        {ok, Host} = inet:gethostname(),
        #net_address{ host = Host, protocol = tcp, family = inet6 }.
  • accept(Listen) ->
      AcceptorPid

    accept/1 應產生一個接受連線的程序。此程序最好以 max 優先順序執行。應傳回此程序的程序識別碼。

    Listen 引數將與上述 listen/1 回呼的傳回值的 Listen 控制代碼部分相同。accept/1 僅在啟動分散式處理協定時呼叫一次。

    此函式的呼叫者是 net_kernel 的代表(這可能是也可能不是註冊為 net_kernel 的程序),並且在本文件中識別為 Kernel。當接受器程序接受連線時,它需要通知 Kernel 有關接受的連線。這是透過傳遞形式為

    Kernel ! {accept, AcceptorPid, DistController, Family, Proto}

    DistController 是連線的分散式處理控制器的程序或埠識別碼。分散式處理控制器應由接受器程序在接受新連線時建立。它的工作是在連線上分派流量。

    Kernel 以以下訊息之一回應

    • {Kernel, controller, SupervisorPid} - 請求已接受,且 SupervisorPid 是連線監管程序的程序識別碼(該程序是在 accept_connection/5 回呼中建立的)。

    • {Kernel, unsupported_protocol} - 請求已遭拒。這是一個嚴重錯誤。接受器程序應終止。

    當接受序列完成時,接受器程序應繼續接受進一步的請求。

  • accept_connection(AcceptorPid, DistCtrl, MyNode, Allowed, SetupTime) ->
      ConnectionSupervisorPid

    accept_connection/5 應產生一個程序,該程序將執行連線的 Erlang 分散式交握。如果交握成功完成,它應繼續作為連線監管器運作。此程序最好以 max 優先級執行,並應連結到呼叫者。dist_util:net_ticker_spawn_options() 函數可以被呼叫以取得適用於此程序的產生選項,這些選項可以直接傳遞給 erlang:spawn_opt/4dist_util:net_ticker_spawn_options() 預設會傳回 [link, {priority, max}],但允許使用者使用 net_ticker_spawn_options 核心參數設定更多選項。應傳回此程序的程序識別碼。

    參數

    • AcceptorPid - 由 accept/1 回呼建立的程序之程序識別碼。

    • DistCtrl - 由接受器程序建立的分散式控制器識別碼。要傳遞給 dist_util:handshake_other_started(HsData)

    • MyNode - 此節點的節點名稱。要傳遞給 dist_util:handshake_other_started(HsData)

    • Allowed - 要傳遞給 dist_util:handshake_other_started(HsData)

    • SetupTime - 透過呼叫 dist_util:start_timer(SetupTime) 來建立設定計時器的時間。計時器應傳遞給 dist_util:handshake_other_started(HsData)

    建立的程序應在 #hs_data{} 記錄中提供交握所需的回呼和其他資訊,並使用此記錄呼叫 dist_util:handshake_other_started(HsData)

    dist_util:handshake_other_started(HsData) 將執行交握,如果交握成功完成,則此程序將在連線正常運作時繼續在連線監管器迴圈中運作。

  • setup(Node, Type, MyNode, LongOrShortNames, SetupTime) ->
      ConnectionSupervisorPid

    setup/5 應產生一個程序,該程序會連線到 Node。當連線建立後,它應執行連線的 Erlang 分散式交握。如果交握成功完成,它應繼續作為連線監管器運作。此程序最好以 max 優先級執行,並應連結到呼叫者。dist_util:net_ticker_spawn_options() 函數可以被呼叫以取得適用於此程序的產生選項,這些選項可以直接傳遞給 erlang:spawn_opt/4dist_util:net_ticker_spawn_options() 預設會傳回 [link, {priority, max}],但允許使用者使用 net_ticker_spawn_options 核心參數設定更多選項。應傳回此程序的程序識別碼。

    參數

    • Node - 遠端節點的節點名稱。要傳遞給 dist_util:handshake_we_started(HsData)

    • Type - 連線類型。要傳遞給 dist_util:handshake_we_started(HsData)

    • MyNode - 此節點的節點名稱。要傳遞給 dist_util:handshake_we_started(HsData)

    • LongOrShortNames - 表示是否使用長名稱或短名稱的原子 longnames 或原子 shortnames

    • SetupTime - 透過呼叫 dist_util:start_timer(SetupTime) 來建立設定計時器的時間。計時器應傳遞給 dist_util:handshake_we_started(HsData)

    此函數的呼叫者是 net_kernel 的代表(這可能或可能不是註冊為 net_kernel 的程序),並且在本文檔中被識別為 Kernel

    除了產生連線監管器之外,此函數還應建立分散式控制器。分散式控制器是負責調度流量的程序或埠。

    建立的程序應在 #hs_data{} 記錄中提供交握所需的回呼和其他資訊,並使用此記錄呼叫 dist_util:handshake_we_started(HsData)

    dist_util:handshake_we_started(HsData) 將執行交握,並且如果交握成功完成,則此程序將在連線正常運作時繼續在連線監管器迴圈中運作。

  • close(Listen) ->
      void()

    呼叫以關閉最初從 listen/1 回呼傳遞的 Listen 控制代碼。

  • select(NodeName) ->
      boolean()

    如果 NodeName 的主機名稱部分對於此協定有效,則傳回 true;否則,傳回 false

還有兩個可能匯出的可選函數

  • setopts(Listen, Opts) ->
      ok | {error, Error}

    參數 Listen 是最初從 listen/1 回呼傳遞的控制代碼。參數 Opts 是要在未來連線上設定的選項清單。

  • getopts(Listen, Opts) ->
      {ok, OptionValues} | {error, Error}

    參數 Listen 是最初從 listen/1 回呼傳遞的控制代碼。參數 Opts 是要為未來連線讀取的選項清單。

#hs_data{} 記錄

dist_util:handshake_we_started/1dist_util:handshake_other_started/1 函數採用 #hs_data{} 記錄作為參數。此記錄中有許多您需要設定的欄位。此記錄定義於 kernel/include/dist_util.hrl 中。未記錄的欄位不應設定,即應保留為 undefined

除非另有說明,否則需要設定下列 #hs_data{} 記錄欄位

  • kernel_pid - Kernel 程序的程序識別碼。也就是說,呼叫 setup/5accept_connection/5 的程序。

  • other_node - 其他節點的名稱。只有當此節點起始連線時,此欄位才是必要的。也就是說,當連線透過 setup/5 設定時。

  • this_node - 此節點的節點名稱。

  • socket - 分散式控制器的識別碼。

  • timer - 使用 dist_util:start_timer/1 建立的計時器。

  • allowed - 作為 Allowed 傳遞給 accept_connection/5 的資訊。只有當遠端節點起始連線時,此欄位才是必要的。也就是說,當連線透過 accept_connection/5 設定時。

  • f_send - 具有下列簽章的 Fun

    fun (DistCtrlr, Data) -> ok | {error, Error}

    其中 DistCtrlr 是分散式控制器的識別碼,而 Data 是要傳遞到另一端的 IO 資料。

    僅在交握階段期間使用。

  • f_recv - 具有下列簽章的 Fun

    fun (DistCtrlr, Length) -> {ok, Packet} | {error, Reason}

    其中 DistCtrlr 是分散式控制器的識別碼。如果 Length0,則應傳回所有可用的位元組。如果 Length > 0,則應傳回剛好 Length 個位元組,或傳回錯誤;當連線從另一端關閉時,可能會捨棄少於 Length 個位元組的資料。它用於從另一端被動接收資料。

    僅在交握階段期間使用。

  • f_setopts_pre_nodeup - 具有下列簽章的 Fun

    fun (DistCtrlr) -> ok | {error, Error}

    其中 DistCtrlr 是分散式控制器的識別碼。在分配通道用於正常流量之前呼叫。

    僅在交握階段期間使用。

  • f_setopts_post_nodeup - 具有下列簽章的 Fun

    fun (DistCtrlr) -> ok | {error, Error}

    其中 DistCtrlr 是分散式控制器的識別碼。在分配通道用於正常流量之後呼叫。

    僅在交握階段期間使用。

  • f_getll - 具有下列簽章的 Fun

    fun (DistCtrlr) -> ID

    其中 DistCtrlr 是分散式控制器的識別碼,而 ID 是處理連線的低階實體之識別碼(通常是 DistCtrlr 本身)。

    僅在交握階段期間使用。

  • f_address - 具有下列簽章的 Fun

    fun (DistCtrlr, Node) -> NetAddress

    其中 DistCtrlr 是分散式控制器的識別碼,Node 是另一端節點的節點名稱,而 NetAddress#net_address{} 記錄,其中包含有關連線另一端 Node 的位址的資訊。#net_address{} 記錄定義於 kernel/include/net_address.hrl 中。

    僅在交握階段期間使用。

  • mf_tick - 具有下列簽章的 Fun

    fun (DistCtrlr) -> void()

    其中 DistCtrlr 是分散式控制器的識別碼。此函數應透過連線傳送另一端不解譯的資訊,同時增加另一端接收封包的統計資訊。這通常透過傳送空的封包來實作。

    注意

    此操作對於呼叫者來說不會長時間封鎖是非常重要的。因為這是從連線監管器呼叫的。

    當連線正常運作時使用。

  • mf_getstat - 具有下列簽章的 Fun

    fun (DistCtrlr) -> {ok, Received, Sent, PendSend}

    其中 DistCtrlr 是分佈控制器的識別符,Received 是已接收的封包,Sent 是已發送的封包,而 PendSend 是佇列中待發送的資料量(通常以位元組為單位,但 dist_util 僅檢查該值是否為非零,以判斷佇列中是否有資料),或是一個 boolean/0,表示佇列中是否有封包待發送。

    注意

    此操作對於呼叫者來說不會長時間封鎖是非常重要的。因為這是從連線監管器呼叫的。

    當連線正常運作時使用。

  • request_type - 傳遞給 setup/5 的請求 Type。只有當此節點發起連線時,此為必要項目。也就是說,連線是透過 setup/5 建立的。

  • mf_setopts - 具有以下簽名的 fun

    fun (DistCtrl, Opts) -> ok | {error, Error}

    其中 DistCtrlr 是分佈控制器的識別符,而 Opts 是要在連線上設定的選項列表。

    此函式為可選。用於連線已建立時。

  • mf_getopts - 具有以下簽名的 fun

    fun (DistCtrl, Opts) -> {ok, OptionValues} | {error, Error}

    其中 DistCtrlr 是分佈控制器的識別符,而 Opts 是要讀取連線的選項列表。

    此函式為可選。用於連線已建立時。

  • f_handshake_complete - 具有以下簽名的 fun

    fun (DistCtrlr, Node, DHandle) -> void()

    其中 DistCtrlr 是分佈控制器的識別符,Node 是另一端連線的節點名稱,而 DHandle 是分佈控制器程序在呼叫以下 BIF 時所需的分配控制代碼

    當握手完成且分佈通道建立時,會呼叫此函式。分佈控制器可以開始在通道上調度流量。此函式為可選。

    僅在交握階段期間使用。

  • add_flags - 要添加到連線的 分佈標誌。目前,所有(非過時的)標誌都會自動啟用。

    此標誌欄位為可選。

  • reject_flags - 要拒絕的 分佈標誌。目前可以拒絕以下分佈標誌

    • DFLAG_DIST_HDR_ATOM_CACHE - 不要在此連線上使用原子快取。

    • DFLAG_FRAGMENTS - 將大型分佈訊息分割成多個片段。

    此標誌欄位為可選。

    另請參閱 分佈資料傳遞

  • require_flags - 要求使用這些 分佈標誌。如果另一端未使用這些標誌,則會在握手期間中止連線。

    此標誌欄位為可選。

分佈資料傳遞

當使用預設配置時,要透過連線傳遞的資料需要以完全相同的順序,且不遺失任何資料,原封不動地傳遞到接收端的節點,如同從發送節點發送的一樣。

可以透過停用需要嚴格排序的功能來放寬資料傳遞順序。這可以透過在設定連線時使用的 #hs_data{} 記錄的 reject_flags 欄位中傳遞由 dist_util:strict_order_flags/0 返回的 分佈標誌 來完成。當使用寬鬆排序時,只有具有相同發送者/接收者對的信號的順序必須保留。但是,請注意,停用需要嚴格排序的功能可能會對效能、吞吐量和/或延遲產生負面影響。

啟用您的分佈模組

為了讓 net_kernel 找出要使用的分佈模組,使用了 erl 命令列引數 -proto_dist。後面跟著一個或多個分佈模組名稱,並刪除了字尾 "_dist"。也就是說,將 gen_tcp_dist 作為分佈模組指定為 -proto_dist gen_tcp

如果未使用任何 epmd(TCP 連接埠對應精靈),也需要指定命令列選項 -no_epmd,這會使 Erlang 跳過 epmd 的啟動,無論是作為作業系統程序還是 Erlang 對應項。

驅動程式

注意

本節是很久以前寫的。其中大部分仍然有效,但從那時起,一些事情發生了變化。對此處介紹的驅動程式的文件進行了一些更新,但還有更多可以做且計畫在未來進行。建議讀者也閱讀 erl_driverdriver_entry 文件。

儘管 Erlang 驅動程式一般可能超出本節的範圍,但簡要介紹似乎是適當的。

一般驅動程式

Erlang 驅動程式是以 C(或組合語言)編寫的原生程式碼模組,它充當某些特殊作業系統服務的介面。這是一個通用機制,在整個 Erlang 模擬器中用於各種 I/O。Erlang 驅動程式可以在執行階段透過使用 erl_ddll Erlang 模組動態連結(或載入)到 Erlang 模擬器。但是,OTP 中的某些驅動程式會靜態連結到執行階段系統,但這更多的是一種最佳化,而不是必要條件。

驅動程式資料類型和驅動程式編寫者可用的函式定義在位於 Erlang include 目錄中的標頭檔 erl_driver.h 中。請參閱 erl_driver 文件,以了解哪些函式可用。

當編寫驅動程式以使通訊協定可供 Erlang 使用時,應該了解有關該特定協定的一切值得了解的內容。所有操作都必須是非阻塞的,並且必須在驅動程式中考慮所有可能的情況。不穩定的驅動程式會影響和/或使整個 Erlang 執行階段系統崩潰。

模擬器會在以下情況下呼叫驅動程式

  • 當載入驅動程式時。此回呼必須具有特殊名稱,並透過返回指向 ErlDrvEntry 結構的指標來告知模擬器要使用哪些回呼,該結構必須正確填寫(請參閱下文)。
  • 當開啟指向驅動程式的連接埠時(由 Erlang 的 open_port 呼叫)。此常式用於設定內部資料結構並返回類型為 ErlDrvData 的不透明資料實體,該資料類型足以容納指標。此函式返回的指標是與此特定連接埠相關的所有其他回呼的第一個引數。它通常稱為連接埠控制代碼。模擬器僅儲存控制代碼,並且永遠不會嘗試解釋它,這就是為什麼它可以是幾乎任何東西(任何不比指標大的東西),並且如果是指標,則可以指向任何東西。通常,此指標是指向保存有關特定連接埠資訊的結構,就像範例中一樣。
  • 當 Erlang 程序將資料發送到連接埠時。資料以位元組緩衝區的形式到達,其解釋未定義,而是由實作者決定。此回呼不會向呼叫者返回任何內容,回應會以訊息的形式發送給呼叫者(使用一個名為 driver_output 的常式,所有驅動程式都可以使用)。還有一種與驅動程式進行同步通訊的方式,如下所述。還有一個額外的回呼函式,用於處理碎片化(在深度 io 列表中發送)的資料。該介面以適用於 Unix writev 而不是單個緩衝區的形式取得資料。分佈驅動程式不需要實作這樣的回呼,因此我們不會。
  • 當為輸入發出檔案描述器信號時。當模擬器偵測到驅動程式透過使用介面 driver_select 標記為監視的檔案描述器上的輸入時,會呼叫此回呼。驅動程式選擇的機制使得可以透過在需要讀取時呼叫 driver_select,然後在此回呼中執行讀取(當可以讀取時),從檔案描述器非阻塞讀取。典型的場景是,當 Erlang 程序訂購讀取操作時,會呼叫 driver_select,並且當檔案描述器上有資料可用時,此常式會發送回應。
  • 當為輸出發出檔案描述器信號時。此回呼的呼叫方式與上一個類似,但當可以寫入檔案描述器時。通常的場景是,Erlang 在檔案描述器上訂購寫入,並且驅動程式呼叫 driver_select。當描述器準備好輸出時,會呼叫此回呼,並且驅動程式可以嘗試發送輸出。佇列可以參與此類操作,並且驅動程式編寫者可以使用方便的佇列常式。
  • 當連接埠關閉時,無論是由 Erlang 程序還是由驅動程式呼叫其中一個 driver_failure_XXX 常式。此常式用於清理與特定連接埠相關的所有內容。當其他回呼呼叫 driver_failure_XXX 常式時,會立即呼叫此常式。發出錯誤的回呼常式不能再使用連接埠的資料結構,因為此常式肯定已釋放所有關聯的資料並關閉所有檔案描述器。但是,如果使用了驅動程式編寫者可用的佇列實用程式,則不會呼叫此常式,直到佇列為空。
  • 當 Erlang 處理程序呼叫 erlang:port_control/3 時,這是一個驅動程式的同步介面。控制介面用於設定驅動程式選項、變更埠的狀態等等。此介面在範例中經常使用。
  • 當計時器到期時。驅動程式可以使用函式 driver_set_timer 設定計時器。當此類計時器到期時,會呼叫特定的回呼函式。範例中未使用任何計時器。
  • 當整個驅動程式卸載時。驅動程式配置的所有資源都必須釋放。

分佈驅動程式的資料結構

用於 Erlang 分佈的驅動程式旨在實作可靠、保持順序、可變長度的封包導向協定。所有錯誤更正、重新傳送等都需要在驅動程式或底層通訊協定中實作。如果協定是串流導向的(例如 TCP/IP 和我們的串流式 Unix 網域 Socket),則需要一些封裝機制。我們將使用簡單的方法,即使用四個位元組的標頭,其中包含以大端 32 位元整數表示的封包長度。由於 Unix 網域 Socket 只能在同一部機器上的處理程序之間使用,因此我們不需要以任何特殊的位元組順序編碼整數,但無論如何我們還是會這樣做,因為在大多數情況下您都需要這樣做。Unix 網域 Socket 是可靠且保持順序的,因此我們不需要在驅動程式中實作重新傳送等功能。

我們從宣告原型並填入靜態 ErlDrvEntry 結構開始撰寫範例 Unix 網域 Socket 驅動程式

( 1) #include <stdio.h>
( 2) #include <stdlib.h>
( 3) #include <string.h>
( 4) #include <unistd.h>
( 5) #include <errno.h>
( 6) #include <sys/types.h>
( 7) #include <sys/stat.h>
( 8) #include <sys/socket.h>
( 9) #include <sys/un.h>
(10) #include <fcntl.h>

(11) #define HAVE_UIO_H
(12) #include "erl_driver.h"

(13) /*
(14) ** Interface routines
(15) */
(16) static ErlDrvData uds_start(ErlDrvPort port, char *buff);
(17) static void uds_stop(ErlDrvData handle);
(18) static void uds_command(ErlDrvData handle, char *buff, int bufflen);
(19) static void uds_input(ErlDrvData handle, ErlDrvEvent event);
(20) static void uds_output(ErlDrvData handle, ErlDrvEvent event);
(21) static void uds_finish(void);
(22) static int uds_control(ErlDrvData handle, unsigned int command,
(23)                        char* buf, int count, char** res, int res_size);

(24) /* The driver entry */
(25) static ErlDrvEntry uds_driver_entry = {
(26)     NULL,                            /* init, N/A */
(27)     uds_start,                       /* start, called when port is opened */
(28)     uds_stop,                        /* stop, called when port is closed */
(29)     uds_command,                     /* output, called when erlang has sent */
(30)     uds_input,                       /* ready_input, called when input
(31)                                         descriptor ready */
(32)     uds_output,                      /* ready_output, called when output
(33)                                         descriptor ready */
(34)     "uds_drv",                       /* char *driver_name, the argument
(35)                                         to open_port */
(36)     uds_finish,                      /* finish, called when unloaded */
(37)     NULL,                            /* void * that is not used (BC) */
(38)     uds_control,                     /* control, port_control callback */
(39)     NULL,                            /* timeout, called on timeouts */
(40)     NULL,                            /* outputv, vector output interface */
(41)     NULL,                            /* ready_async callback */
(42)     NULL,                            /* flush callback */
(43)     NULL,                            /* call callback */
(44)     NULL,                            /* event callback */
(45)     ERL_DRV_EXTENDED_MARKER,         /* Extended driver interface marker */
(46)     ERL_DRV_EXTENDED_MAJOR_VERSION,  /* Major version number */
(47)     ERL_DRV_EXTENDED_MINOR_VERSION,  /* Minor version number */
(48)     ERL_DRV_FLAG_SOFT_BUSY,          /* Driver flags. Soft busy flag is
(49)                                         required for distribution drivers */
(50)     NULL,                            /* Reserved for internal use */
(51)     NULL,                            /* process_exit callback */
(52)     NULL                             /* stop_select callback */
(53) };

在第 1-10 行中,包含了驅動程式所需的 OS 標頭。由於此驅動程式是為 Solaris 撰寫的,因此我們知道標頭 uio.h 存在。因此,可以在第 12 行包含 erl_driver.h 之前定義前置處理器變數 HAVE_UIO_HHAVE_UIO_H 的定義將使 Erlang 驅動程式佇列中使用的 I/O 向量對應於作業系統的同等項目,這非常方便。

在第 16-23 行中,宣告了不同的回呼函式(「前向宣告」)。

驅動程式結構對於靜態連結的驅動程式和動態載入的驅動程式是相似的。但是,某些欄位在不同類型的驅動程式中要保持空白(也就是說,初始化為 NULL)。第一個欄位(init 函式指標)在動態載入的驅動程式中始終保持空白,請參閱第 26 行。第 37 行上的 NULL 始終存在,該欄位不再使用,並且保留以實現向後相容性。此驅動程式中未使用任何計時器,因此不需要計時器的回呼。 outputv 欄位(第 40 行)可用於實作類似於 Unix writev 的輸出介面。Erlang 執行時系統以前無法將 outputv 用於分佈,但從 ERTS 5.7.2 開始可以使用。由於此驅動程式是在 ERTS 5.7.2 之前撰寫的,因此它未使用 outputv 回呼。最好使用 outputv 回呼,因為它減少了資料複製。(但是,我們將在驅動程式內部使用分散/聚集 I/O。)

從 ERTS 5.5.3 開始,驅動程式介面擴充了版本控制和傳遞功能資訊的可能性。功能旗標存在於第 48 行。從 ERTS 5.7.4 開始,用於分佈的驅動程式需要旗標 ERL_DRV_FLAG_SOFT_BUSY。軟忙碌旗標表示驅動程式可以處理對 outputoutputv 回呼的呼叫,即使它已將自己標記為忙碌。這一直是分佈所使用的驅動程式的要求,但以前沒有關於此的功能資訊。如需更多資訊,請參閱 erl_driver:set_busy_port())。

此驅動程式是在執行時系統支援 SMP 之前撰寫的。此驅動程式仍將在具有 SMP 支援的執行時系統中運作,但效能將因用於驅動程式的驅動程式鎖定上的鎖定爭用而受到影響。可以透過檢閱並可能重寫程式碼來減輕此問題,以便驅動程式的每個執行個體都可以安全地平行執行。當執行個體可以安全地平行執行時,可以安全地在驅動程式上啟用特定於執行個體的鎖定。這是透過傳遞 ERL_DRV_FLAG_USE_PORT_LOCKING 作為驅動程式旗標來完成的。這留給讀者作為練習。

因此,定義的回呼如下

  • uds_start - 必須啟動埠的資料。我們在這裡不建立任何 Socket,僅初始化資料結構。

  • uds_stop - 在埠關閉時呼叫。

  • uds_command - 處理來自 Erlang 的訊息。這些訊息可以是將要傳送的純資料,也可以是對驅動程式的更細微的指示。此函式主要用於資料傳輸。

  • uds_input - 當有東西要從 Socket 讀取時呼叫。

  • uds_output - 當可以寫入 Socket 時呼叫。

  • uds_finish - 當驅動程式卸載時呼叫。分佈驅動程式永遠不會卸載,但我們在此處包含此程式碼以求完整。能夠在自己之後清理始終是一件好事。

  • uds_control - erlang:port_control/3 回呼,在此實作中經常使用。

此驅動程式實作的埠以兩種主要模式運作,分別命名為 commanddata。在 command 模式下,只能進行被動讀取和寫入(例如 gen_tcp:recv/gen_tcp:send)。埠在分佈交握期間處於此模式。當連線建立後,埠會切換到 data 模式,並且所有資料都會立即讀取並傳遞到 Erlang 模擬器。在 data 模式下,不會解譯到達 uds_command 的任何資料,只會將其封裝並傳送到 Socket。 uds_control 回呼會在這兩種模式之間切換。

net_kernel 通知不同的子系統連線即將建立時,埠將接受要傳送的資料。但是,埠不應接收任何資料,以避免在每個核心子系統準備好處理資料之前,資料從另一個節點到達。第三種模式,名為 intermediate,用於此中間階段。

為不同類型的埠定義了列舉

( 1) typedef enum {
( 2)     portTypeUnknown,      /* An uninitialized port */
( 3)     portTypeListener,     /* A listening port/socket */
( 4)     portTypeAcceptor,     /* An intermediate stage when accepting
( 5)                              on a listen port */
( 6)     portTypeConnector,    /* An intermediate stage when connecting */
( 7)     portTypeCommand,      /* A connected open port in command mode */
( 8)     portTypeIntermediate, /* A connected open port in special
( 9)                              half active mode */
(10)     portTypeData          /* A connected open port in data mode */
(11) } PortType;

不同的類型如下

  • portTypeUnknown - 埠開啟但未繫結到任何檔案描述器時的類型。

  • portTypeListener - 連接到接聽 Socket 的埠。此埠不會執行太多操作,不會在此 Socket 上執行任何資料傳輸,但是當您嘗試在埠上執行接受時,讀取的資料是可用的。

  • portTypeAcceptor - 此埠表示接受作業的結果。當您想從接聽 Socket 接受時會建立它,並且當接受成功時,它會轉換為 portTypeCommand

  • portTypeConnector - 與 portTypeAcceptor 非常相似,是連線作業請求與 Socket 連接到另一端的接受作業之間的中間階段。當 Socket 連線後,埠會將類型切換為 portTypeCommand

  • portTypeCommand - 先前在 command 模式中提到的已連線 Socket(或接受的 Socket)。

  • portTypeIntermediate - 已連線 Socket 的中間階段。此 Socket 不會處理任何輸入。

  • portTypeData - 資料透過埠傳輸的模式,並且 uds_command 常式將每次呼叫都視為需要傳送的呼叫。在此模式下,當輸入到達 Socket 時,會讀取所有可用的輸入並將其傳送到 Erlang,這很像 gen_tcp Socket 的作用中模式。

我們研究埠所需的狀態。請注意,並非所有欄位都用於所有類型的埠。可以使用聯合來節省一些空間,但是這會使程式碼與多個間接存取混雜在一起,因此為了可讀性,這裡對所有類型的埠都使用了一個結構。

( 1) typedef unsigned char Byte;
( 2) typedef unsigned int Word;

( 3) typedef struct uds_data {
( 4)     int fd;                   /* File descriptor */
( 5)     ErlDrvPort port;          /* The port identifier */
( 6)     int lockfd;               /* The file descriptor for a lock file in
( 7)                                  case of listen sockets */
( 8)     Byte creation;            /* The creation serial derived from the
( 9)                                  lock file */
(10)     PortType type;            /* Type of port */
(11)     char *name;               /* Short name of socket for unlink */
(12)     Word sent;                /* Bytes sent */
(13)     Word received;            /* Bytes received */
(14)     struct uds_data *partner; /* The partner in an accept/listen pair */
(15)     struct uds_data *next;    /* Next structure in list */
(16)     /* The input buffer and its data */
(17)     int buffer_size;          /* The allocated size of the input buffer */
(18)     int buffer_pos;           /* Current position in input buffer */
(19)     int header_pos;           /* Where the current header is in the
(20)                                  input buffer */
(21)     Byte *buffer;             /* The actual input buffer */
(22) } UdsData;

此結構用於所有類型的埠,儘管某些欄位對某些類型無用。最節省記憶體的方法是將此結構安排為結構的聯合。但是,為了在程式碼中存取此類結構中的欄位,程式碼中的多個間接存取會使程式碼過於混亂而不適用於範例。

結構中的欄位如下

  • fd - 與埠關聯的 Socket 的檔案描述器。

  • port - 此結構對應的埠的埠識別碼。從驅動程式回呼到模擬器的多數 driver_XXX 呼叫都需要此識別碼。

  • lockfd - 如果 Socket 是接聽 Socket,我們將使用單獨的(常規)檔案來達到兩個目的

    • 我們需要一個不會產生競爭情況的鎖定機制,以確保另一個 Erlang 節點使用我們要求的接聽 Socket 名稱,或者該檔案是否僅保留自先前的(崩潰)工作階段。

    • 我們將 creation 序號儲存在檔案中。creation 是一個數字,該數字應在具有相同名稱的不同 Erlang 模擬器的不同執行個體之間變更,以便當將來自一個模擬器的處理程序識別碼傳送到具有相同分佈名稱的新模擬器時,這些識別碼不會變為有效。建立可以從 0 到 3(兩個位元)並且儲存在傳送到另一個節點的每個處理程序識別碼中。

      在基於 TCP 的分佈系統中,此資料保留在Erlang 埠對應程式守護進程 (epmd) 中,該守護進程會在分散式節點啟動時聯絡。鎖定檔案和 UDS 接聽 Socket 名稱的慣例消除了在使用此分佈模組時對 epmd 的需求。UDS 始終僅限於一部主機,因此避免使用埠對應程式很容易。

  • creation - 接聽 Socket 的建立編號,計算方式為(鎖定檔案中的值 + 1)rem 4。此建立值也會寫回鎖定檔案,以便模擬器的下一次叫用可以在檔案中找到我們的值。

  • type - 埠的目前類型/狀態,可以是上述宣告的值之一。

  • name - Socket 檔案的名稱(已移除路徑首碼),這允許在關閉 Socket 時刪除 (unlink)。

  • sent - 通過 Socket 傳送了多少個位元組。這可能會換行,但這對分佈沒有問題,因為 Erlang 分佈只關心此值是否已變更。(Erlang net_kernel ticker 透過呼叫驅動程式來擷取此值(這是透過 erlang:port_control/3 常式完成的)來使用此值。)

  • received - 從 socket 讀取(接收)了多少位元組,其使用方式與 sent 類似。

  • partner - 指向另一個 port 結構的指標,該結構可能是此 port 正在接受連線的監聽 port,反之亦然。「夥伴關係」永遠是雙向的。

  • next - 指向所有 port 結構的連結串列中下一個結構的指標。此串列在接受連線時以及驅動程式卸載時使用。

  • buffer_sizebuffer_posheader_posbuffer - 用於輸入緩衝的資料。有關輸入緩衝的詳細資訊,請參閱目錄 kernel/examples 中的原始程式碼。這肯定超出了本節的範圍。

分佈驅動程式實作的選定部分

此處並未完整涵蓋分佈驅動程式的實作,未說明有關緩衝和其他與驅動程式撰寫無關的事項。同樣地,也未詳細說明 UDS 協定的某些特性。所選用的協定並不重要。

驅動程式回呼常式的原型可以在 erl_driver.h 標頭檔中找到。

驅動程式初始化常式(通常)使用巨集宣告,以便更容易在不同的作業系統(以及系統的不同版本)之間移植驅動程式。這是唯一必須具有明確名稱的常式。所有其他回呼都透過驅動程式結構來存取。要使用的巨集名為 DRIVER_INIT,並將驅動程式名稱作為參數。

(1) /* Beginning of linked list of ports */
(2) static UdsData *first_data;

(3) DRIVER_INIT(uds_drv)
(4) {
(5)     first_data = NULL;
(6)     return &uds_driver_entry;
(7) }

該常式會初始化單一全域資料結構,並傳回指向驅動程式項目的指標。當從 Erlang 呼叫 erl_ddll:load_driver 時,會呼叫該常式。

當從 Erlang 開啟 port 時,會呼叫 uds_start 常式。在此情況下,我們只會配置一個結構並初始化它。建立實際的 socket 會留給 uds_command 常式。

( 1) static ErlDrvData uds_start(ErlDrvPort port, char *buff)
( 2) {
( 3)     UdsData *ud;
( 4)
( 5)     ud = ALLOC(sizeof(UdsData));
( 6)     ud->fd = -1;
( 7)     ud->lockfd = -1;
( 8)     ud->creation = 0;
( 9)     ud->port = port;
(10)     ud->type = portTypeUnknown;
(11)     ud->name = NULL;
(12)     ud->buffer_size = 0;
(13)     ud->buffer_pos = 0;
(14)     ud->header_pos = 0;
(15)     ud->buffer = NULL;
(16)     ud->sent = 0;
(17)     ud->received = 0;
(18)     ud->partner = NULL;
(19)     ud->next = first_data;
(20)     first_data = ud;
(21)
(22)     return((ErlDrvData) ud);
(23) }

每個資料項目都會初始化,因此當新建立的 port 關閉時(沒有任何對應的 socket),不會發生任何問題。當從 Erlang 呼叫 open_port({spawn, "uds_drv"},[]) 時,會呼叫此常式。

當 Erlang 程序將資料傳送到 port 時,會呼叫 uds_command 常式。當 port 處於 command 模式時,此常式會處理所有非同步命令,當 port 處於 data 模式時,則處理所有資料的傳送。

( 1) static void uds_command(ErlDrvData handle, char *buff, int bufflen)
( 2) {
( 3)     UdsData *ud = (UdsData *) handle;

( 4)     if (ud->type == portTypeData || ud->type == portTypeIntermediate) {
( 5)         DEBUGF(("Passive do_send %d",bufflen));
( 6)         do_send(ud, buff + 1, bufflen - 1); /* XXX */
( 7)         return;
( 8)     }
( 9)     if (bufflen == 0) {
(10)         return;
(11)     }
(12)     switch (*buff) {
(13)     case 'L':
(14)         if (ud->type != portTypeUnknown) {
(15)             driver_failure_posix(ud->port, ENOTSUP);
(16)             return;
(17)         }
(18)         uds_command_listen(ud,buff,bufflen);
(19)         return;
(20)     case 'A':
(21)         if (ud->type != portTypeUnknown) {
(22)             driver_failure_posix(ud->port, ENOTSUP);
(23)             return;
(24)         }
(25)         uds_command_accept(ud,buff,bufflen);
(26)         return;
(27)     case 'C':
(28)         if (ud->type != portTypeUnknown) {
(29)             driver_failure_posix(ud->port, ENOTSUP);
(30)             return;
(31)         }
(32)         uds_command_connect(ud,buff,bufflen);
(33)         return;
(34)     case 'S':
(35)         if (ud->type != portTypeCommand) {
(36)             driver_failure_posix(ud->port, ENOTSUP);
(37)             return;
(38)         }
(39)         do_send(ud, buff + 1, bufflen - 1);
(40)         return;
(41)     case 'R':
(42)         if (ud->type != portTypeCommand) {
(43)             driver_failure_posix(ud->port, ENOTSUP);
(44)             return;
(45)         }
(46)         do_recv(ud);
(47)         return;
(48)     default:
(49)         return;
(50)     }
(51) }

命令常式有三個參數:uds_start 為 port 傳回的控制代碼(是指向內部 port 結構的指標)、資料緩衝區,以及資料緩衝區的長度。緩衝區是從 Erlang 傳送的資料(位元組清單),轉換為 C 陣列(位元組陣列)。

例如,如果 Erlang 將清單 [$a,$b,$c] 傳送到 port,則 bufflen 變數為 3,而 buff 變數包含 {'a','b','c'}(沒有 NULL 終止符)。通常,第一個位元組會用作運算碼,在此驅動程式中也是如此(至少當 port 處於 command 模式時)。運算碼定義如下:

  • 'L'<socket 名稱> - 使用指定的名稱建立並監聽 socket。

  • 'A'<監聽編號,為 32 位元大端序> - 從指定識別編號識別的監聽 socket 接受連線。識別編號會使用 uds_control 常式擷取。

  • 'C'<socket 名稱> - 連線到名為 <socket 名稱> 的 socket。

  • 'S'<資料> - 在連線/接受的 socket 上傳送資料 <資料>(在 command 模式下)。當資料離開此程序時,會確認傳送。

  • 'R' - 接收一個資料封包。

命令 'R' 中的「一個資料封包」可以解釋如下。此驅動程式總是傳送以 4 位元組標頭封裝的資料,該標頭包含一個大端序 32 位元整數,表示封包中資料的長度。由於此驅動程式僅用於分佈,因此不需要不同的封包大小或某種串流模式。當 UDS socket 在主機本機時,為什麼標頭文字會明確地以大端序編碼?撰寫分佈驅動程式時,這是一個很好的做法,因為分佈實際上通常會跨越主機邊界。

在第 4-8 行中,處理了 port 處於 data 模式或 intermediate 模式的情況,而其餘的常式則處理不同的命令。該常式使用 driver_failure_posix() 常式來報告錯誤(例如,參見第 15 行)。請注意,失敗常式會呼叫 uds_stop 常式,這將移除內部 port 資料。因此,在 driver_failure 呼叫之後,控制代碼(和類型轉換後的控制代碼 ud)是無效指標,我們應立即返回。執行階段系統會向所有連結的程序傳送結束訊號。

當先前傳遞給 driver_select 常式之檔案描述器上有可用的資料時,會呼叫 uds_input 常式。當發出讀取命令且沒有資料可用時,通常會發生這種情況。 do_recv 常式如下所示:

( 1) static void do_recv(UdsData *ud)
( 2) {
( 3)     int res;
( 4)     char *ibuf;
( 5)     for(;;) {
( 6)         if ((res = buffered_read_package(ud,&ibuf)) < 0) {
( 7)             if (res == NORMAL_READ_FAILURE) {
( 8)                 driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_READ, 1);
( 9)             } else {
(10)                 driver_failure_eof(ud->port);
(11)             }
(12)             return;
(13)         }
(14)         /* Got a package */
(15)         if (ud->type == portTypeCommand) {
(16)             ibuf[-1] = 'R'; /* There is always room for a single byte
(17)                                opcode before the actual buffer
(18)                                (where the packet header was) */
(19)             driver_output(ud->port,ibuf - 1, res + 1);
(20)             driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_READ,0);
(21)             return;
(22)         } else {
(23)             ibuf[-1] = DIST_MAGIC_RECV_TAG; /* XXX */
(24)             driver_output(ud->port,ibuf - 1, res + 1);
(25)             driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_READ,1);
(26)         }
(27)     }
(28) }

該常式會嘗試讀取資料,直到讀取一個封包,或 buffered_read_package 常式傳回 NORMAL_READ_FAILURE(模組內部定義的常數,表示讀取操作導致 EWOULDBLOCK)為止。如果 port 處於 command 模式,則讀取會在讀取一個封包時停止。如果 port 處於 data 模式,則讀取會繼續,直到 socket 緩衝區為空(讀取失敗)為止。如果無法再讀取更多資料,但想要更多資料(當 socket 處於 data 模式時,總是如此),則會呼叫 driver_select,以便當有更多資料可供讀取時,呼叫 uds_input 回呼。

當 port 處於 data 模式時,所有資料都會以適合分佈的格式傳送到 Erlang。實際上,原始資料永遠不會到達任何 Erlang 程序,而是會由模擬器本身轉換/解譯,然後以正確的格式傳遞到正確的程序。在目前的模擬器版本中,接收到的資料將標記為 100 的單一位元組。這就是巨集 DIST_MAGIC_RECV_TAG 的定義目的。分佈中資料的標記可能會在未來變更。

uds_input 常式會處理其他輸入事件(例如非阻塞 accept),但最重要的是透過呼叫 do_recv 來處理到達 socket 的資料。

( 1) static void uds_input(ErlDrvData handle, ErlDrvEvent event)
( 2) {
( 3)     UdsData *ud = (UdsData *) handle;

( 4)     if (ud->type == portTypeListener) {
( 5)         UdsData *ad = ud->partner;
( 6)         struct sockaddr_un peer;
( 7)         int pl = sizeof(struct sockaddr_un);
( 8)         int fd;

( 9)         if ((fd = accept(ud->fd, (struct sockaddr *) &peer, &pl)) < 0) {
(10)             if (errno != EWOULDBLOCK) {
(11)                 driver_failure_posix(ud->port, errno);
(12)                 return;
(13)             }
(14)             return;
(15)         }
(16)         SET_NONBLOCKING(fd);
(17)         ad->fd = fd;
(18)         ad->partner = NULL;
(19)         ad->type = portTypeCommand;
(20)         ud->partner = NULL;
(21)         driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_READ, 0);
(22)         driver_output(ad->port, "Aok",3);
(23)         return;
(24)     }
(25)     do_recv(ud);
(26) }

重要的行是函數中的最後一行:呼叫 do_read 常式來處理新的輸入。其餘的函數會處理監聽 socket 上的輸入,這表示可以在 socket 上執行 accept,這也被識別為讀取事件。

輸出機制與輸入類似。 do_send 常式如下所示:

( 1) static void do_send(UdsData *ud, char *buff, int bufflen)
( 2) {
( 3)     char header[4];
( 4)     int written;
( 5)     SysIOVec iov[2];
( 6)     ErlIOVec eio;
( 7)     ErlDrvBinary *binv[] = {NULL,NULL};

( 8)     put_packet_length(header, bufflen);
( 9)     iov[0].iov_base = (char *) header;
(10)     iov[0].iov_len = 4;
(11)     iov[1].iov_base = buff;
(12)     iov[1].iov_len = bufflen;
(13)     eio.iov = iov;
(14)     eio.binv = binv;
(15)     eio.vsize = 2;
(16)     eio.size = bufflen + 4;
(17)     written = 0;
(18)     if (driver_sizeq(ud->port) == 0) {
(19)         if ((written = writev(ud->fd, iov, 2)) == eio.size) {
(20)             ud->sent += written;
(21)             if (ud->type == portTypeCommand) {
(22)                 driver_output(ud->port, "Sok", 3);
(23)             }
(24)             return;
(25)         } else if (written < 0) {
(26)             if (errno != EWOULDBLOCK) {
(27)                 driver_failure_eof(ud->port);
(28)                 return;
(29)             } else {
(30)                 written = 0;
(31)             }
(32)         } else {
(33)             ud->sent += written;
(34)         }
(35)         /* Enqueue remaining */
(36)     }
(37)     driver_enqv(ud->port, &eio, written);
(38)     send_out_queue(ud);
(39) }

此驅動程式使用 writev 系統呼叫,將資料傳送到 socket 上。 writev 和驅動程式輸出佇列的組合非常方便。 ErlIOVec 結構包含 SysIOVec(這相當於 uio.h 中定義的 struct iovec 結構)。 ErlIOVec 還包含 ErlDrvBinary 指標陣列,其長度與 I/O 向量本身中的緩衝區數量相同。可以在驅動程式中「手動」為佇列配置二進位檔,但此處二進位檔陣列會填入 NULL 值(第 7 行)。然後,當呼叫 driver_enqv 時(第 37 行),執行階段系統會配置其自己的緩衝區。

該常式會建立一個 I/O 向量,其中包含標頭位元組和緩衝區(運算碼已移除,且緩衝區長度已由輸出常式減少)。如果佇列為空,我們會將資料直接寫入 socket(或至少嘗試寫入)。如果留下任何資料,則會將其儲存在佇列中,然後我們嘗試傳送佇列(第 38 行)。當訊息完全傳遞時,會傳送確認(第 22 行)。如果傳送已完成,則 send_out_queue 會傳送確認。如果 port 處於 command 模式,則 Erlang 程式碼會序列化傳送操作,以便一次只能有一個封包等待傳遞。因此,只要佇列為空,就可以傳送確認。

send_out_queue 常式如下所示:

( 1) static int send_out_queue(UdsData *ud)
( 2) {
( 3)     for(;;) {
( 4)         int vlen;
( 5)         SysIOVec *tmp = driver_peekq(ud->port, &vlen);
( 6)         int wrote;
( 7)         if (tmp == NULL) {
( 8)             driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_WRITE, 0);
( 9)             if (ud->type == portTypeCommand) {
(10)                 driver_output(ud->port, "Sok", 3);
(11)             }
(12)             return 0;
(13)         }
(14)         if (vlen > IO_VECTOR_MAX) {
(15)             vlen = IO_VECTOR_MAX;
(16)         }
(17)         if ((wrote = writev(ud->fd, tmp, vlen)) < 0) {
(18)             if (errno == EWOULDBLOCK) {
(19)                 driver_select(ud->port, (ErlDrvEvent) ud->fd,
(20)                               DO_WRITE, 1);
(21)                 return 0;
(22)             } else {
(23)                 driver_failure_eof(ud->port);
(24)                 return -1;
(25)             }
(26)         }
(27)         driver_deq(ud->port, wrote);
(28)         ud->sent += wrote;
(29)     }
(30) }

我們只是從佇列中挑選出一個 I/O 向量(這是整個佇列,如 SysIOVec)。如果 I/O 向量太長(IO_VECTOR_MAX 定義為 16),則會縮短向量長度(第 15 行),否則 writev 呼叫(第 17 行)會失敗。會嘗試寫入,且任何寫入的項目都會取消佇列(第 27 行)。如果寫入失敗並出現 EWOULDBLOCK(請注意,所有 socket 都在非阻塞模式下),則會呼叫 driver_select,以便當有空間再次寫入時,呼叫 uds_output 常式。

我們會繼續嘗試寫入,直到佇列為空或寫入被封鎖為止。

上述常式是從 uds_output 常式呼叫的:

( 1) static void uds_output(ErlDrvData handle, ErlDrvEvent event)
( 2) {
( 3)    UdsData *ud = (UdsData *) handle;
( 4)    if (ud->type == portTypeConnector) {
( 5)        ud->type = portTypeCommand;
( 6)        driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_WRITE, 0);
( 7)        driver_output(ud->port, "Cok",3);
( 8)        return;
( 9)    }
(10)    send_out_queue(ud);
(11) }

該常式很簡單:它首先處理輸出選擇將會關注連線中的 socket(且連線被封鎖)的事實。如果 socket 處於連線狀態,則它只會傳送輸出佇列。當可以寫入有輸出佇列的 socket 時,會呼叫此常式,因此毫無疑問該怎麼做。

此驅動程式實作了一個控制介面,這是一個同步介面,會在 Erlang 呼叫 erlang:port_control/3 時被呼叫。只有這個介面可以在驅動程式處於 data 模式時控制它。可以使用以下的操作碼呼叫它:

  • 'C' - 將埠設定為 command 模式。

  • 'I' - 將埠設定為 intermediate 模式。

  • 'D' - 將埠設定為 data 模式。

  • 'N' - 取得監聽埠的識別號碼。這個識別號碼在對驅動程式發出接受命令時使用。它會以大端序 32 位元整數的形式返回,也就是監聽 socket 的檔案識別符。

  • 'S' - 取得統計資訊,包含接收的位元組數、傳送的位元組數,以及輸出佇列中待處理的位元組數。當分佈式系統檢查連線是否存活(心跳)時,會使用這些資料。統計資料會以三個大端序 32 位元整數的形式返回。

  • 'T' - 傳送心跳訊息,也就是長度為 0 的封包。心跳會在埠處於 data 模式時進行,因此無法使用傳送資料的命令(此外,在 command 模式下,它會忽略長度為零的封包)。當沒有其他流量時,心跳機制會使用這個功能來傳送虛擬資料。

    注意: 傳送心跳的介面不應該是阻塞的,這一點非常重要。此實作使用 erlang:port_control/3,它不會阻塞呼叫者。如果使用 erlang:port_command,請使用 erlang:port_command/3 並將 [force] 作為選項列表傳遞;否則,呼叫者可能會在繁忙的埠上無限期地被阻塞,並阻止系統關閉一個無法正常運作的連線。

  • 'R' - 取得監聽 socket 的建立號碼,此號碼用於找出鎖定檔案中儲存的號碼,以區分具有相同名稱的 Erlang 節點的呼叫。

控制介面會取得一個緩衝區來返回其值,但如果提供的緩衝區太小,則可以自由分配自己的緩衝區。uds_control 的程式碼如下:

( 1) static int uds_control(ErlDrvData handle, unsigned int command,
( 2)                        char* buf, int count, char** res, int res_size)
( 3) {
( 4) /* Local macro to ensure large enough buffer. */
( 5) #define ENSURE(N)                               \
( 6)    do {                                         \
( 7)        if (res_size < N) {                      \
( 8)            *res = ALLOC(N);                     \
( 9)        }                                        \
(10)    } while(0)

(11)    UdsData *ud = (UdsData *) handle;

(12)    switch (command) {
(13)    case 'S':
(14)        {
(15)            ENSURE(13);
(16)            **res = 0;
(17)            put_packet_length((*res) + 1, ud->received);
(18)            put_packet_length((*res) + 5, ud->sent);
(19)            put_packet_length((*res) + 9, driver_sizeq(ud->port));
(20)            return 13;
(21)        }
(22)    case 'C':
(23)        if (ud->type < portTypeCommand) {
(24)            return report_control_error(res, res_size, "einval");
(25)        }
(26)        ud->type = portTypeCommand;
(27)        driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_READ, 0);
(28)        ENSURE(1);
(29)        **res = 0;
(30)        return 1;
(31)    case 'I':
(32)        if (ud->type < portTypeCommand) {
(33)            return report_control_error(res, res_size, "einval");
(34)        }
(35)        ud->type = portTypeIntermediate;
(36)        driver_select(ud->port, (ErlDrvEvent) ud->fd, DO_READ, 0);
(37)        ENSURE(1);
(38)        **res = 0;
(39)        return 1;
(40)    case 'D':
(41)        if (ud->type < portTypeCommand) {
(42)            return report_control_error(res, res_size, "einval");
(43)        }
(44)        ud->type = portTypeData;
(45)        do_recv(ud);
(46)        ENSURE(1);
(47)        **res = 0;
(48)        return 1;
(49)    case 'N':
(50)        if (ud->type != portTypeListener) {
(51)            return report_control_error(res, res_size, "einval");
(52)        }
(53)        ENSURE(5);
(54)        (*res)[0] = 0;
(55)        put_packet_length((*res) + 1, ud->fd);
(56)        return 5;
(57)    case 'T': /* tick */
(58)        if (ud->type != portTypeData) {
(59)            return report_control_error(res, res_size, "einval");
(60)        }
(61)        do_send(ud,"",0);
(62)        ENSURE(1);
(63)        **res = 0;
(64)        return 1;
(65)    case 'R':
(66)        if (ud->type != portTypeListener) {
(67)            return report_control_error(res, res_size, "einval");
(68)        }
(69)        ENSURE(2);
(70)        (*res)[0] = 0;
(71)        (*res)[1] = ud->creation;
(72)        return 2;
(73)    default:
(74)        return report_control_error(res, res_size, "einval");
(75)    }
(76) #undef ENSURE
(77) }

巨集 ENSURE(第 5-10 行)用於確保緩衝區足夠大以容納答案。我們根據命令進行切換並採取行動。我們始終在 data 模式的埠上啟用讀取選擇(透過在第 45 行呼叫 do_recv 來實現),但在 intermediatecommand 模式中,我們會關閉讀取選擇(第 27 和 36 行)。

驅動程式的其餘部分或多或少是 UDS 特有的,並且不具有普遍的興趣。

整合所有內容

為了測試分佈式系統,可以使用 net_kernel:start/1 函數。它很有用,因為它會在正在運行的系統上啟動分佈式系統,這樣就可以進行追蹤/除錯。net_kernel:start/1 常式會將一個列表作為其唯一的參數。列表中的第一個元素應該是節點名稱(不含 "@hostname")作為原子。第二個(也是最後一個)元素應該是原子 shortnameslongnames 之一。在範例案例中,建議使用 shortnames

為了讓 net_kernel 找出要使用的分佈式模組,請使用命令列參數 -proto_dist。後面跟著一個或多個分佈式模組名稱,其中刪除了尾碼 "_dist",也就是說,作為分佈式模組的 uds_dist 會被指定為 -proto_dist uds

如果未使用任何 epmd(TCP 連接埠對應精靈),也需要指定命令列選項 -no_epmd,這會使 Erlang 跳過 epmd 的啟動,無論是作為作業系統程序還是 Erlang 對應項。

必須在啟動時知道分佈式模組所在的目錄路徑。這可以透過在命令列中指定 -pa <path> 或建置一個包含用於分佈式協定的應用程式的啟動腳本來實現。(在 uds_dist 協定中,只需要將 uds_dist 應用程式新增到腳本中。)

如果指定了以上所有內容,並且命令列中存在 -sname <name> 標誌,則分佈式系統會在啟動時啟動。

範例 1

$ erl -pa $ERL_TOP/lib/kernel/examples/uds_dist/ebin -proto_dist uds -no_epmd
Erlang (BEAM) emulator version 5.0

Eshell V5.0  (abort with ^G)
1> net_kernel:start([bing,shortnames]).
{ok,<0.30.0>}
(bing@hador)2>

範例 2

$ erl -pa $ERL_TOP/lib/kernel/examples/uds_dist/ebin -proto_dist uds \
      -no_epmd -sname bong
Erlang (BEAM) emulator version 5.0

Eshell V5.0  (abort with ^G)
(bong@hador)1>

可以使用 ERL_FLAGS 環境變數來儲存複雜的參數。

$ ERL_FLAGS=-pa $ERL_TOP/lib/kernel/examples/uds_dist/ebin \
      -proto_dist uds -no_epmd
$ export ERL_FLAGS
$ erl -sname bang
Erlang (BEAM) emulator version 5.0

Eshell V5.0  (abort with ^G)
(bang@hador)1>

ERL_FLAGS 不應包含節點名稱。