How to create a simple HLS/DASH manifest monitoring service with AWS services

This tutorial describes how to create a monitoring service for DASH or HLS manifests to see if they exist and are reachable using different AWS services: GraphQL, Amplify, AppSync, Lambda functions, SQS, State machines and step functions.

First of all create a new amplify Graphql project to hold the data:

  1. create new app in https://eu-west-1.console.aws.amazon.com/amplify/home?region=eu-west-1#/
  2. create a new react app: npx create-react-app cm-graphql
  3. cd in cm-graphql and pull locally created app:  amplify pull –appId dp8qhbmkum6zu –envName staging
  4. create a graphql api: amplify add api
  5. fill in information:
    Select from one of the below mentioned services: GraphQL

      Here is the GraphQL API that we will create. Select a setting to edit or continue (Use arrow keys)

         Name: manifest-graphql-monitoring 

         Authorization modes: API key (default, expiration time: 7 days from now) 

       Here is the GraphQL API that we will create. Select a setting to edit or continue Conflict detection (required for DataStore): Disabled

       Enable conflict detection? Yes

       Select the default resolution strategy Auto Merge

       Here is the GraphQL API that we will create. Select a setting to edit or continue Continue

       Choose a schema template: Blank SchemaAdd schema to graphql

6. amplify push, answer yes:
Do you want to generate code for your newly created GraphQL API Yes

Choose the code generation language target javascript

Enter the file name pattern of graphql queries, mutations and subscriptions src/graphql/**/*.js

Do you want to generate/update all possible GraphQL operations – queries, mutations and subscriptions Yes

Enter maximum statement depth [increase from default if your schema is deeply nested]

7. npm install aws-amplify @aws-amplify/ui-react

Now that we have created the GraphQl api endpoint and React app we can create the new SQS queue to hold the data bits with the manifest urls needed to be analysed:

Console: https://eu-west-1.console.aws.amazon.com/sqs/v2/home?region=eu-west-1#/queues

1 – Click on create queue

2 – Choose queue name, must end in .fifo

3 – FIFO first in first out exactly once processing, better than standard which might have more execution of the same message

4 – Choose a short retention period, like 4 minutes, so that the message gets purged not too late

5 – for simplicty set the Access Policy to “only the queue owner” can send and receive messages

6 – Create the queue

Lambda API functions to process the events on the queue and successively save the returned event in GraphQL:

Console: https://eu-west-1.console.aws.amazon.com/lambda/ 

1 – Create a new Lambda function

2 – Add a SQS trigger with the .fifo queue just created

3 – The lambda function will be registered also in the queue home page

4 – In the lambda function (in this case used Python) get the event by accessing the parameter: event["Records"][0]["body"]

5 – Transform if needed the data bit, then send to a Step function for processing

Step functions in step machines allow for simple and complex workflows using different AWS services

Console: https://eu-west-1.console.aws.amazon.com/states/home?region=eu-west-1#/statemachines

1 – Create a new state machine

2 – Create a new lambda function that will be the ingress to the state machine and the different steps

3 – Access the object in the lambda function sent from the previous lambda function which got the event from the queue

4 – In this new lambda check if the host is up using check-host service and requests method in Python

5 – Use such a function to check if the host is up:

def lambda_handler(event, context):
    url = event.get("url", None)
    if url:
        headers = {"Accept":"application/json"}
        check_host_response = requests.get("https://check-host.net/check-http?host={url}&max_nodes=10".format(url=url), headers=headers)
        print(check_host_response.text)
    
    # Perform request to check host
    try:
        check_host_json = json.loads(check_host_response.text)
        print(json.dumps(check_host_json, indent=True))
    except Exception as e:
        check_host_json = {"error":str(e)}
        
    # Get check response from check-host
    request_id = check_host_json.get("request_id", None)
    if request_id:
        try:
            check_host_result = requests.get("https://check-host.net/check-result/{request_id}".format(request_id))
            check_host_result_json = json.loads(check_host_result.text)
        except Exception as e:
            check_host_result_json = {"error":str(e)}
    else:
        check_host_result_json = {"error":"No request id"}
        
    
    # TODO implement
    return {
        'statusCode': 200,
        'check_host_response': json.dumps(check_host_json, indent=True),
        'check_host_result': json.dumps(check_host_result_json, indent=True),
        'url': url
    }

Add result of query to GraphQL by creating a new lambda function, adding it as a next step function in the state machine and having such code to save in GraphQL, make sure the GraphQL endpoint and API are set in the Lambdas enviroonment variables:

def _create_response_check_host_model(ip_address, url_name, country, whois, registrar, asn_allocation_date):
    """
    Updates ip_information model with calcuated scores.
    """
    
    input = {
      "ip_address": ip_address, 
      "url_name": url_name,
      "site_up": event["check_host_result"]
    }
    query = """
     mutation CreateIPInformationModelMutation($input: CreateIPInformationModelInput!) {
      createIPInformationModel(input: $input) {
        id
        ip_address
        url_name
        createdAt
	site_up
      }
    }
    """
    print("Input to be saved: ", json.dumps(input, indent=True))
    res = appsync.query(query, { 'input': input })
    print("Created  model",json.dumps(res, indent=True))
    return res["data"]["createIPInformationModel"]

The appsync.query is:

import json
import os
import boto3
import requests



from requests_aws_sign import AWSV4Sign

def query(query, variables: dict):
    session = boto3.session.Session()
    credentials = session.get_credentials()
    region = session.region_name or 'eu-west-1'
    
    endpoint = os.environ.get('APPSYNC_URL', None)
    headers={"Content-Type": "application/json", "x-api-key": os.environ.get('API_KEY', None)}
    payload = {"query": query, 'variables': variables}

    appsync_region = __parse_region_from_url(endpoint) or region
    auth=AWSV4Sign(credentials, appsync_region, 'appsync')
    try:
        response = requests.post(
            endpoint,
            auth=auth,
            json=payload,
            headers=headers
        ).json()
        if 'errors' in response:
            print('Error attempting to query AppSync')
            print(response['errors'])
        else:
            return response
    except Exception as exception:
        print('Error with Mutation')
        print(exception)

    return None

def __parse_region_from_url(url):
    """Parses the region from the appsync url so we call the correct region regardless of the session or the argument"""
    # Example URL: https://xxxxxxx.appsync-api.us-east-2.amazonaws.com/graphql
    split = url.split('.')
    if 2 < len(split):
        return split[2]
    return None

Test your workflow by adding a url to the SQS fifo queue created in the beginning!