Message Bus
ご覧のページは、お客様の利便性のために一部機械翻訳されています。また、ドキュメントは頻繁に更新が加えられており、翻訳は未完成の部分が含まれることをご了承ください。最新情報は都度公開されておりますため、必ず英語版をご参照ください。翻訳に問題がある場合は、 こちら までご連絡ください。

非同期メッセージングの使用

メッセージバスの非同期オプションは、「ファイア・アンド・フォーゲット」動作を提供します。メッセージを送信し、応答を待たずに処理を続行します。

非同期メッセージは、 シリアル または パラレル の宛先に送信されます。

  • シリアル宛先の場合、メッセージバスはメッセージをキューに入れ、メッセージごとに1つのワーカースレッドを委任します。 スレッドはメッセージリスナーを順番に処理します。

  • パラレル宛先の場合、メッセージバスはメッセージをキューに入れ、1つのメッセージリスナーにつきメッセージごとに1つのワーカースレッドを委任します。 スレッドはメッセージリスナーを同時に処理します。

別のクラス(メッセージリスナー)がリッスンしているシリアル宛先にメッセージを送信することから始めます。

メッセージを送る

新しいLiferay インスタンスを起動し、以下を実行します。

docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.132-ga132

http://localhost:8080でLiferayにサインインします。 メールアドレス test@liferay.com とパスワード testを使用してください。 プロンプトが表示されたら、パスワードを learnに変更します。

サンプルプロジェクトでメッセージを送信することから始めます。

  1. サンプルをダウンロードして解凍します。

    curl https://resources.learn.liferay.com/examples/liferay-n8k5.zip -O
    
    unzip liferay-n8k5.zip
    
  2. 宛先モジュールn8k5-able-implをビルドしてデプロイします。

    cd liferay-n8k5/n8k5-able-impl
    
    ../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    

    このコマンドは、モジュール JAR を Docker コンテナ上の /opt/liferay/osgi/modules にコピーするのと同じです。

    Dockerコンテナコンソールに、モジュールが起動されたことが示されます。

    STARTED com.acme.n8k5.able.impl_1.0.0
    
  3. リスナーモジュールn8k5-charlie-implをビルドしてデプロイします。

    cd ../n8k5-charlie-impl
    
    ../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    

    Dockerコンテナコンソールに、モジュールが起動されたことが示されます。

    STARTED com.acme.n8k5.charlie.impl_1.0.0
    
  4. 送信者モジュールn8k5-baker-implをビルドしてデプロイします。

    cd ../n8k5-baker-impl
    
    ../gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    

    Docker コンテナ コンソールで、 N8K5Baker がメッセージを送信し、 N8K5CharlieMessageListener がメッセージを受信し、 n8k5-baker-impl モジュールが起動したことを確認します。

    INFO  [pipe-start 2025][N8K5Baker:24] Sent message to acme/n8k5_able
    INFO  [acme/n8k5_able-4][N8K5CharlieMessageListener:21] Received message payload N8K5Baker#_activate
    STARTED com.acme.n8k5.baker.impl_1.0.0 [2025]
    

N8K5Bakerは、宛先acme/n8k5_ableにメッセージを送信したことを報告しました。 N8K5CharlieMessageListenerは、宛先acme/n8k5_ableでペイロードN8K5Baker#_activateを含むメッセージを受信しました。 これで、サンプルコードを調べることができます。

プロジェクト概要

この例の3つのモジュールには、それぞれ1つのクラスがあります。 各クラスは、メッセージングコンポーネントの1つ(宛先、送信者、リスナー)を表します。

クラスの例:

クラス説明
n8k5-able-impl の N8K5AbleMessagingConfiguratoracme/n8k5_ableという名前のメッセージ宛先を作成し、メッセージバスに登録します。
n8k5-baker-impl の N8K5Bakeracme/n8k5_able宛先にメッセージを送信します。
n8k5-charlie-impl の N8K5CharlieMessageListeneracme/n8k5_able宛先に送信されたメッセージをリッスンします。

これらがどのように相互作用するかを以下に示します。

  1. N8K5Bakerが有効になり(たとえば、n8k5-baker-implモジュールが起動したとき)、acme/n8k5_able宛先にメッセージを送信します。
  2. メッセージバスがメッセージをN8K5CharlieMessageListenerに送信します。
  3. N8K5CharlieMessageListenerがメッセージを受信します。

宛先構成と送信者クラスを調べます。 リスナー クラス N8K5CharlieMessageListener は、 メッセージのリッスン で示されているのと同じ方法で登録されます。

宛先構成を調べる

n8k5-able-implモジュールのN8K5AbleMessagingConfiguratorクラスは、宛先を作成して構成します。 コードは次のとおりです。

@Component
public class N8K5AbleMessagingConfigurator {

	@Activate
	private void _activate(BundleContext bundleContext) {
		Destination destination = _destinationFactory.createDestination(
			DestinationConfiguration.createSerialDestinationConfiguration(
				"acme/n8k5_able"));

		_serviceRegistration = bundleContext.registerService(
			Destination.class, destination,
			MapUtil.singletonDictionary(
				"destination.name", destination.getName()));
	}

	@Deactivate
	private void _deactivate() {
		if (_serviceRegistration != null) {
			_serviceRegistration.unregister();
		}
	}

	@Reference
	private DestinationFactory _destinationFactory;

	private ServiceRegistration<Destination> _serviceRegistration;

}

どのクラスでも宛先を作成して構成できますが、 コンポーネント には、 DestinationFactoryのような注入された依存関係がある場合があります。 _destinationFactory フィールドの @Reference アノテーションは、Liferay の OSGi フレームワークに DestinationFactory インスタンスを挿入するように指示します。

_activate メソッドで、 N8K5AbleMessagingConfiguratorDestinationFactoryDestinationConfiguration を使用して、 serial 宛先 acme/n8k5_ableを作成します。 次に、OSGiフレームワークBundleContextを使用して、Destinationに対するサービスを登録します。 N8K5AbleMessagingConfiguratorが無効化されると、_deactivateメソッドはサービスの登録を解除します。

送信者を調べる

以下の N8K5Baker クラスは、ペイロード "N8K5Baker#_activate" を持つメッセージを acme/n8k5_ableという名前の宛先に送信します。

@Activate
private void _activate() {
	Message message = new Message();

	message.setPayload("N8K5Baker");

	_messageBus.sendMessage("acme/n8k5_able", message);
}

@Reference
private MessageBus _messageBus;

コンポーネントとして、N8K5Baker@Referenceアノテーションを使用してMessageBusインスタンスを挿入します。

コンポーネントのアクティブ化時に、N8K5Bakerは、アクティブ化メソッド_activate()を介してメッセージを作成して送信します。 Message インスタンスを構築し、それにペイロードを追加します。 ペイロードは、Messageに入力できるもののうちの1つです。

主なメッセージ入力方法は次のとおりです。

メソッド説明
setPayload(Object)Messageのメインコンテンツを追加します。
setResponseDestinationName(String)応答を受信するためのDestinationを参照します。
setValues(Map<String,Object>)Mapから追加データを提供します。

N8K5Baker は、 MessageBussendMessage(String, Message) メソッドを呼び出して、 Destinationacme/n8k5_able にメッセージを送信します。 MessageBus は新しいスレッドを開始し、 MessageMessageListener インスタンス ( acme/n8k5_able Destinationに登録されています) に送信します。 N8K5Bakerのスレッドが継続します。

Messageへの応答を受信する場合は、 Message に応答の宛先を設定し、 N8K5Bakerなどのクラスを MessageListener としてその宛先に登録します。 詳細については、 メッセージのリッスン を参照してください。

応答処理の追加

メッセージ受信者からの応答が必要な場合は、返信の応答先を設定します。

  1. メッセージ応答用に別の宛先を登録します。
  2. クラス(例えば、元の送信者)をMessageListenerとして応答先に登録します。
  3. メッセージで応答先を渡します。
  4. MessageListenerに応答ロジックを追加します。

ステップ1:応答の宛先を登録する

N8K5AbleDestinationConfiguratorが宛先を管理するのと同じ方法で、応答先を管理するようにN8K5Bakerを変更できます。 _activate()メソッドのシグネチャを_activate(BundleContext bundleContext)に置き換え、acme/n8k5_baker応答先のサービスを作成、構成、および登録するコードを追加します。 サービスの登録を解除する_deactivate()メソッドを追加します。 _activate(BundleContext bundleContext)メソッドと_deactivate()メソッドは次のようになります。

@Activate
private void _activate(BundleContext bundleContext) {
   Destination destination = _destinationFactory.createDestination(
      DestinationConfiguration.createSerialDestinationConfiguration(
         "acme/n8k5_baker"));

   _serviceRegistration = bundleContext.registerService(
      Destination.class, destination,
      MapUtil.singletonDictionary(
         "destination.name", destination.getName()));

   Message message = new Message();

   message.setPayload("N8K5Baker#_activate");

   _messageBus.sendMessage("acme/n8k5_able", message);
}

@Deactivate
private void _deactivate() {
   if (_serviceRegistration != null) {
      _serviceRegistration.unregister();
   }
}

@Reference
private DestinationFactory _destinationFactory;

private ServiceRegistration<Destination> _serviceRegistration;

ステップ2:N8K5Bakerを応答先のリスナーとして登録する

送信者N8K5Bakerの変更点は次のとおりです。

  1. @Component アノテーションを更新し、 N8K5Baker タイプ MessageListener.class のサービスを宣言し、プロパティ "destination.name=acme/n8k5_baker"を介して N8K5Baker をその応答宛先にマッピングします。
  2. MessageListener インターフェースを実装します。
  3. receive(Message)メソッドをメッセージ処理ロジックでオーバーライドします。

送信者の変更は次のようになります。

@Component(
	property = "destination.name=acme/n8k5_baker",
	service = MessageListener.class
)
public class N8K5Baker implements MessageListener {

	@Override
	public void receive(Message message) {
		Object payload = message.getPayload();

		_log.info("Received message payload " + payload.toString());
	}

   // Existing methods and fields

   private static final Log _log = LogFactoryUtil.getLog(N8K5Baker.class);
}

ステップ3:メッセージの応答先を渡す

N8K5Bakerが送信するメッセージの応答先としてacme/n8k5_bakerを設定します。 次のようになります。

@Activate
private void _activate(BundleContext bundleContext) {
   // Destination setup

   Message message = new Message();

   message.setPayload("N8K5Baker#_activate");
   message.setResponseDestinationName("acme/n8k5_baker");

   _messageBus.sendMessage("acme/n8k5_able", message);
}

ステップ4:MessageListenerに応答ロジックを追加する

MessageListenerreceive(Message)メソッドで、応答を設定し、メッセージから応答先を取得し、MessageBusインスタンスを使用して応答メッセージを応答先に送信します。 次のようになります。

public void receive(Message message) {
   // Message processing

   message.setResponse("N8K5CharlieMessageListener");

   Message responseMessage = new Message();

   responseMessage.setDestinationName(
      message.getResponseDestinationName());
   responseMessage.setPayload("N8K5CharlieMessageListener");
   responseMessage.setResponseId(message.getResponseId());

   _messageBus.sendMessage(
      message.getResponseDestinationName(), responseMessage);
}

// Existing methods and fields

@Reference
private MessageBus _messageBus;

変更をテストする

サンプルプロジェクトを再デプロイして、変更をテストします。

cd ../../liferay-n8k5.zip
./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)

出力は次のようになります。

STARTED com.acme.n8k5.charlie.impl_1.0.0 [2020]
STARTED com.acme.n8k5.baker.impl_1.0.0 [2025]
INFO  [acme/n8k5_able-2][N8K5CharlieMessageListener:23] Received message payload N8K5Baker#_activate
INFO  [acme/n8k5_baker-2][N8K5Baker:30] Received message payload N8K5CharlieMessageListener

N8K5CharlieMessageListenerは、N8K5Bakerのメッセージを受信してから、応答メッセージを応答先に送信します。 N8K5Bakerは応答メッセージを受信し、メッセージペイロードを出力します。

クラスが再度メッセージを交換するようにしたい場合は、 Gogo Shellでモジュール (OSGi バンドル) を再起動できます。 バンドルを一覧表示して (lb)、バンドル ID を取得し、バンドルを停止して (stop <id>)、バンドルを再起動します (start <id>)。

OSGi コンポーネントではないクラスでは、 MessageBusUtilDestinationDestinationConfigurationMessage、および MessageListener インスタンスを使用してメッセージを送信できます。

示されているように Destination サービスを登録できますが、 BundleContext を別の方法で取得する必要があります (たとえば、次の呼び出しを行う必要があります: Bundle bundle = FrameworkUtil.getBundle(YourClass.class); BundleContext bundleContext = bundle.getBundleContext())。

  2つのクラス間で非同期的にメッセージを交換しました。

次のステップ

非同期メッセージングに慣れてきたので、最適なパフォーマンスになるように調整できます。 方法については、 メッセージング パフォーマンスのチューニングを参照してください。

デフォルト モードと 直接 モードを使用した同期メッセージングについて詳しく知りたい場合は、詳細については 以前のバージョンでの直接同期メッセージングの使用 および 以前のバージョンでのデフォルトの同期メッセージングの使用 を参照してください。