Message Bus
ご覧のページは、お客様の利便性のために一部機械翻訳されています。また、ドキュメントは頻繁に更新が加えられており、翻訳は未完成の部分が含まれることをご了承ください。最新情報は都度公開されておりますため、必ず英語版をご参照ください。翻訳に問題がある場合は、 こちら までご連絡ください。

旧バージョンでダイレクトシンクロナスメッセージを使用する場合

Liferay DXP 7.4 U48/Portal GA49 以下

ダイレクト同期メッセージングは、すべてのリスナーがメッセージを受信するまで処理をブロックする最も簡単な方法です。 SynchronousMessageSendersend(String, Message)メソッドを呼び出し、宛先名とメッセージインスタンスを渡します。 SynchronousMessageSenderは、現在のスレッドを使用して、宛先に登録されている各メッセージリスナーで直接メッセージ受信を処理します。 リスナーの処理が完了すると、send(String, Message)メソッドを呼び出したクラスで実行が続行されます。 この例は、ダイレクト同期メッセージングの使用をデモしています。

ダイレクト同期メッセージを送信する

サンプルプロジェクトでは、SynchronousMessageSenderを使用して、2つのリスナーに直接メッセージを送信します。

新しいLiferay インスタンスを起動し、以下を実行します。

docker run -it -m 8g -p 8080:8080 liferay/portal:7.4.3.48-ga48

http://localhost:8080でLiferayにサインインします。 メールアドレス test@liferay.com とパスワード testを使用してください。 プロンプトが表示されたら、パスワードを learnに変更します。

次に、以下の手順に従います。

  1. サンプルをダウンロードして解凍します。

    curl https://resources.learn.liferay.com/examples/liferay-x6n5.zip -O
    
    unzip liferay-x6n5.zip
    
  2. サンプルのプロジェクトモジュールをビルドしてデプロイします。

    cd liferay-x6n5
    
    ./gradlew deploy -Ddeploy.docker.container.id=$(docker ps -lq)
    

    このコマンドは、モジュールJARをDockerコンテナ上の /opt/liferay/osgi/modules にコピーするのと同じです。

  3. Dockerコンテナコンソールに、モジュールが起動されたことが示されます。

    STARTED com.acme.x6n5.able.impl_1.0.0
    STARTED com.acme.x6n5.baker.impl_1.0.0
    STARTED com.acme.x6n5.charlie.impl_1.0.0
    STARTED com.acme.x6n5.dog.impl_1.0.0
    
  4. ブラウザでhttp://localhost:8080にあるLiferayインスタンスにアクセスし、認証情報を使用してサインインします。

  5. Gogo Shellを開きます。

  6. Gogoシェルコマンドフィールドに、x6n5:sendMessageと入力し、その後にメッセージを入力します。 例えば、

    x6n5:sendMessage foo
    
  7. 出力が次のようになっていることを確認します。

    INFO  [pipe-x6n5:sendMessage foo][X6N5DogMessageListener:21] Received message payload foo
    INFO  [pipe-x6n5:sendMessage foo][X6N5CharlieMessageListener:21] Received message payload foo
    INFO  [pipe-x6n5:sendMessage foo][X6N5BakerOSGiCommands:28] Response: X6N5CharlieMessageListener
    

スレッドは、メッセージを送信するときにメッセージ送信者(つまり、X6N5BakerOSGiCommands)でブロックされます。 X6N5CharlieMessageListenerおよびX6N5DogMessageListenerでメッセージを処理した後、スレッドはメッセージ送信者で続行されます。

プロジェクト概要

4つのサンプルモジュールには1つのクラスがあります。 1つのクラスは宛先を管理し、別のクラスはメッセージを送信し、他の2つは宛先に送信されたメッセージをリッスンします。

クラスの例:

クラスモジュール説明
X6N5AbleMessagingConfiguratorx6n5-able-implacme/x6n5_ableという名前のメッセージ宛先を作成し、メッセージバスに登録します。
X6N5BakerOSGiCommandsx6n5-baker-implacme/x6n5_able宛先にメッセージを送信し、応答をログに記録します。
X6N5CharlieMessageListenerx6n5-charlie-implacme/x6n5_able宛先に送信されたメッセージをリッスンします。 メッセージペイロードをログに記録し、メッセージに応答を設定します。
X6N5DogMessageListenerx6n5-dog-implacme/x6n5_able宛先に送信されたメッセージをリッスンします。 メッセージペイロードをログに記録し、メッセージに応答を設定します。

イベントフローは次のとおりです。

  1. ユーザーがx6n5:sendMessageGogoシェルコマンドを実行すると、X6N5BakerOSGiCommandsはメッセージペイロードのコマンド引数をacme/x6n5_able宛先に送信します。

  2. 現在のスレッドは、各リスナー(つまり、X6N5CharlieMessageListenerX6N5DogMessageListener)のメッセージ受信を連続して処理します。 リスナーはメッセージペイロードをログに記録し、メッセージに応答を設定します。 処理された最新のリスナーからの応答は、以前の応答に優先します。

  3. 処理はX6N5BakerOSGiCommandsに戻り、メッセージ応答をログに記録します。

これで、宛先コンフィギュレーターから順に、各クラスを調べることができます.

宛先コンフィグレーターを調べる

x6n5-able-implモジュールのX6N5AbleMessagingConfiguratorクラスは、acme/x6n5_ableという名前の宛先を作成して構成します。 コードは次のとおりです。

@Component
public class X6N5AbleMessagingConfigurator {

	@Activate
	private void _activate(BundleContext bundleContext) {
		Destination destination = _destinationFactory.createDestination(
			DestinationConfiguration.createSynchronousDestinationConfiguration(
				"acme/x6n5_able"));

		_serviceRegistration = bundleContext.registerService(
			Destination.class, destination,
			MapUtil.singletonDictionary(
				"destination.name", destination.getName()));
	}

	@Deactivate
	private void _deactivate() {
		if (_serviceRegistration != null) {
			_serviceRegistration.unregister();
		}
	}

	@Reference
	private DestinationFactory _destinationFactory;

	private ServiceRegistration<Destination> _serviceRegistration;

}

このコンフィギュレータは コンポーネント クラスです。 @Reference アノテーションを使用して、 DestinationFactory インスタンスを挿入します。

_activate(BundleContext) メソッドは、 DestinationFactoryDestinationConfiguration を使用して、 acme/x6n5_ableという名前の 同期 宛先を作成します。 同期宛先は、同期メッセージング用に最適化されています。 最後に、メソッドは、 BundleContextを使用して、OSGi サービスに Destination を登録します。

X6N5AbleMessagingConfiguratorが無効化されると、その_deactivate()メソッドは宛先サービスの登録を解除します。

送信者を調べる

x6n5-baker-implモジュールのX6N5BakerOSGiCommandsクラスは、メッセージを宛先に送信するOSGiコマンドを提供します。

@Component(
	property = {"osgi.command.function=sendMessage", "osgi.command.scope=x6n5"},
	service = X6N5BakerOSGiCommands.class
)
public class X6N5BakerOSGiCommands {

	public void sendMessage(String payload) throws MessageBusException {
		Message message = new Message();

		message.setPayload(payload);

		Object response = _synchronousMessageSender.send(
			"acme/x6n5_able", message);

		System.out.println("Response: " + response);
	}

	@Reference(target = "(mode=DIRECT)")
	private SynchronousMessageSender _synchronousMessageSender;

}

X6N5BakerOSGiCommandsは、独自のクラスタイプのサービスComponentです。 これは、 @Reference アノテーションを使用して、 SynchronousMessageSender を挿入します。これは、 直接 モード (アノテーションの target = "(mode=DIRECT)" 属性によって指定されます) に設定されます。

直接 モードでは、 SynchronousMessageSender send メソッドは、現在のスレッドがすべてのリスナーにメッセージを配信するまで、呼び出し元のクラスをブロックします。

X6N5BakerOSGiCommands@Componentプロパティは、sendMessageと呼ばれるGogoシェルコマンド関数をx6n5スコープで定義します。 コマンドはsendMessage(String)メソッドにマッピングされ、入力Stringを受け取ります。

sendMessage(String) メソッドは、Gogo シェル コマンドの String をペイロードとして Message を作成します。 SynchronousMessageSender send(String, Message) メソッドは、現在のスレッドを使用して、メッセージを acme/x6n5_able Destination メッセージ リスナーに配信します。 スレッドがすべての MessageListener内のメッセージを処理するまで、 X6N5BakerOSGiCommands クラスの実行がブロックされます。 その後、X6N5BakerOSGiCommands sendMessage(String)メソッドで実行が続行され、メッセージ応答がログに記録されます。

リスナーを調べる

x6n5-charlie-impl モジュールの X6N5CharlieMessageListener クラスと x6n5-dog-impl モジュールの X6N5DogMessageListener クラスは、 acme/x6n5_able Destinationに送信されたメッセージをリッスンします。 これらは、 メッセージのリッスン で示されているのと同じ方法で登録されます。

X6N5CharlieMessageListenerクラス:

@Component(
	property = "destination.name=acme/x6n5_able",
	service = MessageListener.class
)
public class X6N5CharlieMessageListener implements MessageListener {

	@Override
	public void receive(Message message) {
		System.out.println("Received message payload " + message.getPayload());

		message.setResponse("X6N5CharlieMessageListener");
	}

}

X6N5DogMessageListenerクラス:

@Component(
	property = "destination.name=acme/x6n5_able",
	service = MessageListener.class
)
public class X6N5DogMessageListener implements MessageListener {

	@Override
	public void receive(Message message) {
		System.out.println("Received message payload " + message.getPayload());

		message.setResponse("X6N5DogMessageListener");
	}

}

各リスナーのreceive(Message)メソッドは、メッセージペイロードをログに記録してから、メッセージ応答を独自のクラス名に設定します。

  ダイレクト同期メッセージングの使用方法が分かりました。

次のステップ

デフォルト モードを使用して同期メッセージングについて詳しく知りたい場合は、「 以前のバージョンでのデフォルトの同期メッセージングの使用」を参照してください。

メッセージを送信した後すぐに処理を続行する場合は、「 非同期メッセージングの使用」を参照してください。