Integration
PubSub and PHP
We have not created a full example of using PHP, but instead we provide some guidance.
Blob storage
The blob storage is very easily accessible, and using a library is usually not necessary, however it is possible to use azure-sdk-for-php.
In any case, getting the url for the blob consists of the same steps:
- Get the base url for the blob storage from TenantService in the field BlobBaseUrlsWithToken, this includes the SharedAccessToken in its query string: https://myaccount.blob.core.windows.net/batchnotification?sv=2015-12-11&sr=c&sig=XXXX&st=2018-09-19T11:28:58Z&se=2018-09-19T12:28:58Z&sp=r
- When getting the service bus message, it contains the uri to the blob to download in the field uri: https://myaccount.blob.core.windows.net/mycontainer/myblob
- Join the query string for the first with the base of the second: https://myaccount.blob.core.windows.net/mycontainer/myblob?sv=2015-12-11&sr=c&sig=XXXX&st=2018-09-19T11:28:58Z&se=2018-09-19T12:28:58Z&sp=r
- Verify that the two base urls match (optional)
- Download the file found at that url.
Service Bus
Azure Service Bus support two protocols: REST API and AMQP.
Connect by: AMQP
If you have a AMQP library that supports the required SharedAccessSignature-authentication scheme, it is recommended to use that (we have however not found one yet). More details about AMQP and Azure Service Bus
Connect by: REST API
Azure Service Bus support gettings messages using normal HTTP-calls to its REST API. The normal flow of this API is to do:
- GET: This returns the oldest message with a message id. The message is hidden until the lock period expires (usually 30 seconds), the message must be processed within this time. It is only possible to GET a message 10 times (without calling PUT to abandon it), before it is moved to a dead-letter-queue.
- DELETE: Once the message is processed by the client, it is deleted
- PUT: If the client does not want to process the message, it will be made available in the API
It is very possible to perform these operations as low-level HTTP-calls, but usage of a library is recommended.
REST API Library
The currently recommended library is azure-sdk-for-php, however this library does not support the SharedAccessSignature-schema that we use to give access to our Service Bus. To remedy this, we have written an extension to fix this. It is our hope that this extension will eventually be part of the standard library.
Client code example (this is very simplified, please use the .NET example for more details):
// Pre-requisites
// sudo apt install zip unzip php7.0-zip
// php composer.phar require microsoft/windowsazure
require_once __DIR__ . '/../vendor/autoload.php';
require_once __DIR__ . '/azure-sdk-for-php-sas-extension.php'; // These are extensions that we have added, should be merged into official library
use WindowsAzure\Common\ServicesBuilder;
use WindowsAzure\Common\ServiceException;
use WindowsAzure\ServiceBus\Models\ReceiveMessageOptions;
$options = getOptions();
$topicName = "batchnotification";
$subscriptionName = "something";
$isRunning = true;
$serviceBusRestProxy = ServicesBuilderToken::getInstance()->createServiceBusService(getSASFromTenantService());
while($isRunning)
{
try {
// Get message.
$message = $serviceBusRestProxy->receiveSubscriptionMessage($topicName, $subscriptionName, $options); // Listens for message until message is found OR timeout occurs.
if(!is_null($message))
{
ProcessMessage($message);
$serviceBusRestProxy->deleteMessage($message); // Message processing done, delete the message and get ready for the next one.
// Todo: If an exception occurs here, you might consider unlocking the message using $serviceBusRestProxy->unlockMessage($message) to make it available before its locktime is exceeded
} else {
echo "No message found\r\n";
}
} catch(ServiceException $e){
$code = $e->getCode();
if($code == 401) // SAS token expired, re-get it.
{
sleep(1); // Todo: Implement some back-off logic here
$serviceBusRestProxy = ServicesBuilderToken::getInstance()->createServiceBusService(getSASFromTenantService());
}
// Todo: Decide if the message should be deleted here, depends on the type of error. If nothing is done, the message will come back later.
$error_message = $e->getMessage();
echo $code.": ".$error_message."\r\n";
}
}
function processMessage(WindowsAzure\ServiceBus\Models\BrokeredMessage $message)
{
echo "Body: ".$message->getBody()."\r\n";
echo "MessageID: ".$message->getMessageId()."\r\n";
}
function getSASFromTenantService()
{
// Todo: Get from tenantservice
return "Endpoint=https://myaccount.servicebus.windows.net/;SharedAccessSignature=XXXX;EntityPath=batchnotification";
}
function getOptions()
{
$options = new ReceiveMessageOptions();
//$options->setReceiveAndDelete();
$options->setPeekLock();
$options->setTimeout(20);
if($options->getIsReceiveAndDelete())
$mode = "RECEIVE_AND_DELETE";
else
$mode = "PEEK_LOCK";
echo "Listening for messages with ".$mode." and timeout ".$options->getTimeout()."\r\n";
}
Required extensions to library (temporary until made part of library, please validate yourself):
use WindowsAzure\Common\ServicesBuilder;
use WindowsAzure\Common\ServiceException;
use WindowsAzure\Common\Internal\ServiceBusSettings;
use WindowsAzure\ServiceBus\Models\ReceiveMessageOptions;
use WindowsAzure\Common\Internal\Utilities;
use WindowsAzure\Common\Internal\Resources;
use WindowsAzure\ServiceBus\ServiceBusRestProxy;
class ServiceBusTokenSettings extends ServiceBusSettings
{
public $sharedAccessSignature;
public function __construct(
$serviceBusEndpoint,
$filter,
$sharedAccessSignature
)
{
parent::__construct($serviceBusEndpoint, $filter);
$this->sharedAccessSignature = $sharedAccessSignature;
}
public static function createFromConnectionString($connectionString) {
// Todo: Move to Resources constants
self::$validSettingKeys[] = "SharedAccessSignature";
self::$validSettingKeys[] = "EntityPath";
$tokenizedSettings = self::parseAndValidateKeys($connectionString);
$sharedAccessSignature = $tokenizedSettings["SharedAccessSignature"];
$endpoint = Utilities::tryGetValueInsensitive(
Resources::SERVICE_BUS_ENDPOINT_NAME,
$tokenizedSettings
);
return new self($endpoint, null, $sharedAccessSignature);
// Todo: Add methods for creating these settings for SharedAccessSignature
/*if (array_key_exists(Resources::SHARED_SHARED_ACCESS_KEY_NAME, $tokenizedSettings)) {
return self::createServiceBusWithSasAuthentication($tokenizedSettings, $connectionString);
}
return self::createServiceBusWithWrapAuthentication($tokenizedSettings, $connectionString);*/
}
}
class ServicesBuilderToken extends ServicesBuilder
{
private static $_instance = null;
public static function getInstance()
{
if (!isset(self::$_instance)) {
self::$_instance = new self();
}
return self::$_instance;
}
function createServiceBusService($connectionString)
{
$settings = ServiceBusTokenSettings::createFromConnectionString(
$connectionString
);
$httpClient = $this->httpClient();
if(!is_null($settings->sharedAccessSignature))
{
$httpClient->setHeader("Authorization", $settings->sharedAccessSignature);
}
$serializer = $this->serializer();
$serviceBusWrapper = new ServiceBusRestProxy(
$httpClient,
$settings->getServiceBusEndpointUri(),
$serializer
);
// Todo: Create a SASToken-filter
/*
// Adding headers filter
$headers = [];
$headersFilter = new HeadersFilter($headers);
$serviceBusWrapper = $serviceBusWrapper->withFilter($headersFilter);
$filter = $settings->getFilter();
return $serviceBusWrapper->withFilter($filter);
*/
return $serviceBusWrapper;
}
}