This tutorial describes how to create a monitoring service for DASH or HLS manifests to see if they exist and are reachable using different AWS services: GraphQL, Amplify, AppSync, Lambda functions, SQS, State machines and step functions.
First of all create a new amplify Graphql project to hold the data:
- create new app in https://eu-west-1.console.aws.amazon.com/amplify/home?region=eu-west-1#/
- create a new react app: npx create-react-app cm-graphql
- cd in cm-graphql and pull locally created app: amplify pull –appId dp8qhbmkum6zu –envName staging
- create a graphql api: amplify add api
- fill in information:
Select from one of the below mentioned services: GraphQL
Here is the GraphQL API that we will create. Select a setting to edit or continue (Use arrow keys)
Name: manifest-graphql-monitoring
Authorization modes: API key (default, expiration time: 7 days from now)
Here is the GraphQL API that we will create. Select a setting to edit or continue Conflict detection (required for DataStore): Disabled
Enable conflict detection? Yes
Select the default resolution strategy Auto Merge
Here is the GraphQL API that we will create. Select a setting to edit or continue Continue
Choose a schema template: Blank SchemaAdd schema to graphql
6. amplify push, answer yes:
Do you want to generate code for your newly created GraphQL API Yes
Choose the code generation language target javascript
Enter the file name pattern of graphql queries, mutations and subscriptions src/graphql/**/*.js
Do you want to generate/update all possible GraphQL operations – queries, mutations and subscriptions Yes
Enter maximum statement depth [increase from default if your schema is deeply nested]
7. npm install aws-amplify @aws-amplify/ui-react
Now that we have created the GraphQl api endpoint and React app we can create the new SQS queue to hold the data bits with the manifest urls needed to be analysed:
Console: https://eu-west-1.console.aws.amazon.com/sqs/v2/home?region=eu-west-1#/queues
1 – Click on create queue
2 – Choose queue name, must end in .fifo
3 – FIFO first in first out exactly once processing, better than standard which might have more execution of the same message
4 – Choose a short retention period, like 4 minutes, so that the message gets purged not too late
5 – for simplicty set the Access Policy to “only the queue owner” can send and receive messages
6 – Create the queue
Lambda API functions to process the events on the queue and successively save the returned event in GraphQL:
Console: https://eu-west-1.console.aws.amazon.com/lambda/
1 – Create a new Lambda function
2 – Add a SQS trigger with the .fifo queue just created
3 – The lambda function will be registered also in the queue home page
4 – In the lambda function (in this case used Python) get the event by accessing the parameter: event["Records"][0]["body"]
5 – Transform if needed the data bit, then send to a Step function for processing
Step functions in step machines allow for simple and complex workflows using different AWS services
Console: https://eu-west-1.console.aws.amazon.com/states/home?region=eu-west-1#/statemachines
1 – Create a new state machine
2 – Create a new lambda function that will be the ingress to the state machine and the different steps
3 – Access the object in the lambda function sent from the previous lambda function which got the event from the queue
4 – In this new lambda check if the host is up using check-host service and requests method in Python
5 – Use such a function to check if the host is up:
def lambda_handler(event, context):
url = event.get("url", None)
if url:
headers = {"Accept":"application/json"}
check_host_response = requests.get("https://check-host.net/check-http?host={url}&max_nodes=10".format(url=url), headers=headers)
print(check_host_response.text)
# Perform request to check host
try:
check_host_json = json.loads(check_host_response.text)
print(json.dumps(check_host_json, indent=True))
except Exception as e:
check_host_json = {"error":str(e)}
# Get check response from check-host
request_id = check_host_json.get("request_id", None)
if request_id:
try:
check_host_result = requests.get("https://check-host.net/check-result/{request_id}".format(request_id))
check_host_result_json = json.loads(check_host_result.text)
except Exception as e:
check_host_result_json = {"error":str(e)}
else:
check_host_result_json = {"error":"No request id"}
# TODO implement
return {
'statusCode': 200,
'check_host_response': json.dumps(check_host_json, indent=True),
'check_host_result': json.dumps(check_host_result_json, indent=True),
'url': url
}
Add result of query to GraphQL by creating a new lambda function, adding it as a next step function in the state machine and having such code to save in GraphQL, make sure the GraphQL endpoint and API are set in the Lambdas enviroonment variables:
def _create_response_check_host_model(ip_address, url_name, country, whois, registrar, asn_allocation_date):
"""
Updates ip_information model with calcuated scores.
"""
input = {
"ip_address": ip_address,
"url_name": url_name,
"site_up": event["check_host_result"]
}
query = """
mutation CreateIPInformationModelMutation($input: CreateIPInformationModelInput!) {
createIPInformationModel(input: $input) {
id
ip_address
url_name
createdAt
site_up
}
}
"""
print("Input to be saved: ", json.dumps(input, indent=True))
res = appsync.query(query, { 'input': input })
print("Created model",json.dumps(res, indent=True))
return res["data"]["createIPInformationModel"]
The appsync.query is:
import json
import os
import boto3
import requests
from requests_aws_sign import AWSV4Sign
def query(query, variables: dict):
session = boto3.session.Session()
credentials = session.get_credentials()
region = session.region_name or 'eu-west-1'
endpoint = os.environ.get('APPSYNC_URL', None)
headers={"Content-Type": "application/json", "x-api-key": os.environ.get('API_KEY', None)}
payload = {"query": query, 'variables': variables}
appsync_region = __parse_region_from_url(endpoint) or region
auth=AWSV4Sign(credentials, appsync_region, 'appsync')
try:
response = requests.post(
endpoint,
auth=auth,
json=payload,
headers=headers
).json()
if 'errors' in response:
print('Error attempting to query AppSync')
print(response['errors'])
else:
return response
except Exception as exception:
print('Error with Mutation')
print(exception)
return None
def __parse_region_from_url(url):
"""Parses the region from the appsync url so we call the correct region regardless of the session or the argument"""
# Example URL: https://xxxxxxx.appsync-api.us-east-2.amazonaws.com/graphql
split = url.split('.')
if 2 < len(split):
return split[2]
return None
Test your workflow by adding a url to the SQS fifo queue created in the beginning!