Categories
Linux Python

Cloudflare DNS: using the python api

Intro

The examples provided on github are kind of wrong. I created an example script which actually works. If you simply copy their example and try the one where you add a DNS record using the python interface to the api, you will get this error:

CloudFlare.exceptions.CloudFlareAPIError: Requires permission “com.cloudflare.api.account.zone.create” to create zones for the selected account

Read on to see the corrected script.

Then some months later I created a script – still using the python api – to do a DNS export of all the zone files our account owns on Cloudflare. I will also share that.

The details

I call the program below listrecords.py. This one was copied from somewhere and it worked without modification:

import CloudFlare
import sys

def main():
    zone_name = sys.argv[1]

    cf = CloudFlare.CloudFlare()

    # query for the zone name and expect only one value back
    try:
        zones = cf.zones.get(params = {'name':zone_name,'per_page':1})
    except CloudFlare.exceptions.CloudFlareAPIError as e:
        exit('/zones.get %d %s - api call failed' % (e, e))
    except Exception as e:
        exit('/zones.get - %s - api call failed' % (e))

    if len(zones) == 0:
        exit('No zones found')

    # extract the zone_id which is needed to process that zone
    zone = zones[0]
    zone_id = zone['id']

    # request the DNS records from that zone
    try:
        dns_records = cf.zones.dns_records.get(zone_id)
    except CloudFlare.exceptions.CloudFlareAPIError as e:
        exit('/zones/dns_records.get %d %s - api call failed' % (e, e))

    # print the results - first the zone name
    print("zone_id=%s zone_name=%s" % (zone_id, zone_name))

    # then all the DNS records for that zone
    for dns_record in dns_records:
        r_name = dns_record['name']
        r_type = dns_record['type']
        r_value = dns_record['content']
        r_id = dns_record['id']
        print('\t', r_id, r_name, r_type, r_value)

    exit(0)

if __name__ == '__main__':
    main()

The next script adds a DNS record. This is the one which I needed to modify.

# kind of from https://github.com/cloudflare/python-cloudflare
# except that most of their python examples are wrong. So this is a working version...
import sys
import CloudFlare

def main():
    zone_name = sys.argv[1]
    print('input zone name',zone_name)
    cf = CloudFlare.CloudFlare()
# zone_info is a list: [{'id': '20bd55fbc94ff155c468739', 'name': 'johnstechtalk-2.com', 'status': 'pending',
    zone_info = cf.zones.get(params={'name': zone_name})
    zone_id = zone_info[0]['id']

    dns_records = [
        {'name':'foo', 'type':'A', 'content':'192.168.0.1'},
    ]

    for dns_record in dns_records:
        r = cf.zones.dns_records.post(zone_id, data=dns_record)
    exit(0)

if __name__ == '__main__':
    main()

The zone_id is where the original program’s wheels fell off. Cloudflare Support does not support this python api, at least that’s what they told me. So I was on my own. What gave me confidence that it really should work is that when you install the python package, it also installs cli4. And cli4 works pretty well! The examples work. cli4 is a command line program for linux. But when you examine it you realize it’s (I think) using the python behind the scenes. And in the original bad code there was a POST just to get the zone_id – that didn’t seem right to me.

Backup all zones in the Cloudflare account by doing a DNS export

I call this script backup-all-zones.py:

import os
import CloudFlare
from datetime import datetime

def listzones(cf):
    allzones = list()
    page_number = 0
    while True:
        page_number += 1
        raw_results = cf.zones.get(params={'per_page':20,'page':page_number})
        #print(raw_results)
        zones = raw_results['result']

        for zone in zones:
            zone_id = zone['id']
            zone_name = zone['name']
            print("zone_id=%s zone_name=%s" % (zone_id, zone_name))
            allzones.append([zone_id,zone_name])

        total_pages = raw_results['result_info']['total_pages']
        if page_number == total_pages:
            break
    #print('allzones',allzones)
    return allzones

# main program
today = datetime.today().date() # today's date
date = today.strftime('%Y%m%d') # formatted date
print('Begin backup of zones on this day:',date)
newdir = 'zones-' + date
os.makedirs(newdir,exist_ok=True)

cf = CloudFlare.CloudFlare(raw=True)
print('Getting list of all zones and zone ids')
allzones = listzones(cf)
print('Begin export of the zone data')
for zone in allzones:
    zone_id,zone_name = zone
    print('Doing dns export of',zone_id,zone_name)
# call to do a BIND-style export of the zone, specified by zoneid
    res = cf.zones.dns_records.export.get(zone_id)
    dns_records = res['result']
    with open(f'{newdir}/{zone_name}','w') as f:
        f.write(dns_records)
# create compressed tar file and delete temp directory
print('Create compressed tar file')
os.system(f'tar czf backups/{newdir}.tar.gz {newdir}')
print(f'Remove directory {newdir} and all its contents')
os.system(f'rm -rf {newdir}')

As mentioned in the comments the cool thing in this backup is that the format output is the BIND style of files, which are quite readable. Obviously this script is designed for linux systems because that’s all I use.

The environment

Just to note it, you install the package with a pip3 install cloudflare. Then I set up an environment variable CLOUDFLARE_API_TOKEN before running these programs.

Conclusion

I’ve shown a corrected python script which uses the Cloudflare api. I’ve also shown another one which can do a backup of all Cloudflare zones.

References and related

The Cloudflare api

The (wrong) api examples on github

My hearty endorsement of Using Cloudflare’s free tier to protect your personal web site.

Categories
Cloud

ADO: Check pipeline runs

Intro

I have previously written how to copy all Azure DevOps (ADO) logs to a linux server. In this post I share a script I wrote which does a quality check on all the most recent pipeline runs. If there are any issues, a message is sent to a MS teams channel.

Let’s get into the details.

Preliminary details

I am using the api, needless to say. I cannot say I have mastered the api or even come close to understanding it. I however have leveraged the same api call I have previously used since I observed it contains a lot of interesting data.

conf_check_all.ini

This config file is written as json to make importing a breeze. You set up optional trigger conditions for the various pipeline runs you will have, or even whether or not to perform any checks on it at all.

{
"organization":"drjohns4ServicesCoreSystems",
"project":"Connectivity",
"url_base":"https://dev.azure.com/",
"url_params":"&api-version=7.1-preview.7",
"test_flag":false,
"run_ct_min":2,
"queue_time_max":1800,
"pipelines":{
"comment":{"maximum_processing_time_in_seconds":"integer_value","minimum_processing_time_in_seconds":"integer_value","(optional) check_flag - to potentially disable the checks for this pipeline":"either true or false"},
"default":{"max_proc_time":1800,"min_proc_time":3,"check_flag":true},
"feed_influxdb":{"max_proc_time":180,"min_proc_time":3},
"PAN-Usage4Mgrs":{"max_proc_time":900,"min_proc_time":60,"check_flag":true},
"PAN-Usage4Mgrs-2":{"max_proc_time":900,"min_proc_time":60},
"speed-up-sampling":{"max_proc_time":900,"min_proc_time":"2","check_flag":false},
"Pipeline_check":{"max_proc_time":45,"min_proc_time":"2","check_flag":false},
"Discover new vEdges":{"max_proc_time":3600,"min_proc_time":3,"check_flag":true},
}
}

So you see at the bottom is a dictionary where the keys are the names of the pipelines I am running, plus a default entry.

check_all_pipelines.py

#!/usr/bin/python3
# fetch raw log to local machine
# for relevant api section, see:
#https://learn.microsoft.com/en-us/rest/api/azure/devops/build/builds/get-build-log?view=azure-devops-rest-7.1
import urllib.request,json,sys,os
from datetime import datetime,timedelta
from modules import aux_modules

conf_file = sys.argv[1]

# pipeline uses UTC so we must follow suit or we will miss files
#a_day_ago = (datetime.utcnow() - timedelta(days = 1)).strftime('%Y-%m-%dT%H:%M:%SZ')
startup_delay = 30 # rough time in seconds before the pipeline even begins to execute our script
an_hour_ago = (datetime.utcnow() - timedelta(hours = 1, seconds = startup_delay)).strftime('%Y-%m-%dT%H:%M:%SZ')
print('An hour ago was (UTC)',an_hour_ago)
format = '%Y-%m-%dT%H:%M:%SZ'

#url = 'https://dev.azure.com/drjohns4ServicesCoreSystems/Connectivity/_apis/build/builds?minTime=2022-10-11T13:00:00Z&api-version=7.1-preview.7'

# dump config file into a dict
config_d = aux_modules.parse_config(conf_file)
test_flag = config_d['test_flag']
if test_flag:
    print('config_d',config_d)
    print('We are in a testing mode because test_flag is:',test_flag)

url_base = f"{config_d['url_base']}{config_d['organization']}/{config_d['project']}/_apis/build/builds"
url = f"{url_base}?minTime={an_hour_ago}{config_d['url_params']}"
#print('url',url)
req = urllib.request.Request(url)
req.add_header('Authorization', 'Basic ' + os.environ['ADO_AUTH'])

# Get buildIds for pipeline runs from last 1 hour
with urllib.request.urlopen(req) as response:
   html = response.read()
txt_d = json.loads(html)
#{"count":215,"value":[{"id":xxx, "buildNumber":"20230203.107","status":"completed","result":"succeeded","queueTime":"2023-02-03T21:12:01.0865046Z","startTime":"2023-02-03T21:12:05.2177605Z","finishTime":"2023-02-03T21:17:28.1523128Z","definition":{"name":"PAN-Usage4Mgrs-2"
value_l = txt_d['value']
all_msgs = ''
header_msg = '**Recent pipeline issues**\n'
# check for too few pipeline runs
if len(value_l) <= config_d['run_ct_min']:
    all_msgs = f"There have been fewer than expected pipeline runs this past hour. Greater than **{config_d['run_ct_min']}** runs are expected, but there have been only **{len(value_l)}** runs.  \nSeomthing may be wrong.  \n"

for builds in value_l:
    msg = aux_modules.check_this_build(builds,config_d,url_base)
    if msg: all_msgs = f"{all_msgs}  \n{msg}  \n"

if all_msgs:
    if not test_flag: aux_modules.sendMessageToTeams(header_msg + all_msgs) # send to WebHook if not in a testing mode
    print(header_msg + all_msgs)
else:
    print('No recent pipeline errors')

Short explanation

I consider the code to be mostly self-explanatory. A cool thing I’m trying out here is the f- format specifier to write to a string kind of like sprintf. I run this script every hour from, yes, an ADO pipeline! But since this job looks for errors, including errors which indicate a systemic problem with the agent pool, I run it from a different agent pool.

aux_modules.py

import json,re
import os,urllib.request
from datetime import datetime,timedelta
import pymsteams

def parse_config(conf_file):
# config file should be a json file
    f = open(conf_file)
    config_d = json.load(f)
    f.close()
    return config_d

def get_this_log(config_d,name,buildId,build_number):
# leaving out the api-version etc works better
#GET https://dev.azure.com/{organization}/{project}/_apis/build/builds/{buildId}/logs/{logId}?api-version=7.1-preview.2
#https://dev.azure.com/drjohns4ServicesCoreSystems/d6338e-f5b4-45-6c-7b3a86/_apis/build/builds/44071/logs/7'
        buildId_s = str(buildId)
        log_name = config_d['log_dir'] + "/" + name + "-" + build_number
# check if we already got this one
        if os.path.exists(log_name):
            return
        #url = url_base + organization + '/' + project + '/_apis/build/builds/' + buildId_s + '/logs/' + logId + '?' + url_params
        url = config_d['url_base'] + config_d['organization'] + '/' + config_d['project'] + '/_apis/build/builds/' + buildId_s + '/logs/' + config_d['logId']
        print('url for this log',url)
        req = urllib.request.Request(url)
        req.add_header('Authorization', 'Basic ' + config_d['auth'])
        with urllib.request.urlopen(req) as response:
            html = response.read()
        #print('log',html)
        print("Getting (name,build_number,buildId,logId) ",name,build_number,buildId_s,config_d['logId'])
        f = open(log_name,"wb")
        f.write(html)
        f.close()

def check_this_build(builds,config_d,url_base):
    format = '%Y-%m-%dT%H:%M:%SZ'
    buildId = builds['id']
    build_number = builds['buildNumber']
    status = builds['status'] # normally: completed
    result = builds['result'] # normally: succeeded
    queueTime = builds['queueTime']
    startTime = builds['startTime']
    finishTime = builds['finishTime']
    build_def = builds['definition']
    name = build_def['name']
    print('name,build_number,id',name,build_number,buildId)
    print('status,result,queueTime,startTime,finishTime',status,result,queueTime,startTime,finishTime)
    qTime = re.sub(r'\.\d+','',queueTime)
    fTime = re.sub(r'\.\d+','',finishTime)
    sTime = re.sub(r'\.\d+','',startTime)
    qt_o = datetime.strptime(qTime, format)
    ft_o = datetime.strptime(fTime, format)
    st_o = datetime.strptime(sTime, format)
    duration_o = ft_o - st_o
    duration = int(duration_o.total_seconds())
    print('duration',duration)
    queued_time_o = st_o - qt_o
    queued_time = int(queued_time_o.total_seconds())
    queue_time_max = config_d['queue_time_max']
# and from the config file we have...
    pipes_d = config_d['pipelines']
    this_pipe = pipes_d['default']
    if name in pipes_d: this_pipe = pipes_d[name]
    msg = ''
    if 'check_flag' in this_pipe:
        if not this_pipe['check_flag']:
            print('Checking for this pipeline has been disabled: ',name)
            return msg # skip this build if in test mode or whatever
    print('duration,min_proc_time,max_proc_time',duration,this_pipe['min_proc_time'],this_pipe['max_proc_time'])
    print('queued_time,queue_time_max',queued_time,queue_time_max)
    if duration > this_pipe['max_proc_time'] or duration < this_pipe['min_proc_time']:
        msg = f"ADO Pipeline **{name}** run is outside of expected time range. Build number: **{build_number}**. \n  Duration, max_proc_time, min_proc_time: **{duration},{this_pipe['max_proc_time']},{this_pipe['min_proc_time']}**"
    if not status == 'completed' or not result == 'succeeded':
        msg = f"ADO Pipeline **{name}** run has unexpected status or result. Build number: **{build_number}**. \n  - Status: **{status}** \n  - Result: **{result}**"
    if queued_time > queue_time_max: # Check if this job was queued for too long
        msg = f"ADO Pipeline **{name}** build number **{build_number}** was queued too long. Queued time was **{queued_time}** seconds"
    if msg:
# get the logs meta info to see which log is the largest
        url = f"{url_base}/{buildId}/logs"
        req = urllib.request.Request(url)
        req.add_header('Authorization', 'Basic ' + os.environ['ADO_AUTH'])
# Get buildIds for pipeline runs from last 1 hour
        with urllib.request.urlopen(req) as response:
            html = response.read()
        txt_d = json.loads(html)
        value_l = txt_d['value']
#{"count":11,"value":[{"lineCount":31,"createdOn":"2023-02-13T19:03:17.577Z","lastChangedOn":"2023-02-13T19:03:17.697Z","id":1...
        l_ct_max = 0
        log_id_err = 0
# determine log with either an error or the most lines - it differs for different pipeline jobs
        for logs_d in value_l[4:]:      # only consider the later logs
            url = f"{url_base}/{buildId}/logs/{logs_d['id']}"
            req = urllib.request.Request(url)
            req.add_header('Authorization', 'Basic ' + os.environ['ADO_AUTH'])
            with urllib.request.urlopen(req) as response:
                html = response.read().decode('utf-8')
            if re.search('error',html):
                log_id_err = logs_d['id']
                print('We matched the word error in log id',log_id_err)
            l_ct = logs_d['lineCount']
            if l_ct > l_ct_max:
                l_ct_max = l_ct
                log_id_all = logs_d['id']
        if log_id_err > 0 and not log_id_all == log_id_err: # error over long log file when in conflict
            log_id_all = log_id_err
        url_all_logs = f"{url_base}/{buildId}/logs/{log_id_all}"
        msg = f"{msg}  \n**[Go to Log]({url_all_logs})**  "
    print(msg)
    return msg

def sendMessageToTeams(msg: str):
    """
    Send a message to a Teams Channel using webhook
    """
# my Pipeline_check webhook
    webHookUrl = "https://drjohns.webhook.office.com/webhookb2/66f741-9b1e-401c-a8d3-9448d352db@ec386b-c8f-4c0-a01-740cb5ba55/IncomingWebhook/2c8e881d05caba4f484c92617/7909f-d2f-b1d-3c-4d82a54"
    try:
        # escaping underscores to avoid alerts in italics.
        msg = msg.replace('_', '\_')
        teams_msg = pymsteams.connectorcard(webHookUrl)
        teams_msg.text(f'{msg}')
        teams_msg.send()

    except Exception as e:
        print(f'failed to send alert: {str(e)}')

aux_modules.py contains most of the logic with checking each pipeline against the criteria and constructing an alert in Markdown to send to MS Teams. I’m not saying it’s beautiful code. I’m still learning. But I am saying it works.

I’ve revised the code to find the log file which is most likely to contain the “interesting” stuff. That’s usually the longest one excluding the first five or so. There are often about 10 logs available for even a minimal pipeline run. So this extra effort helps.

Then I further revised the code to fetch the logs and look for the word “error.” That may show up in the longest log or it may not. It not, that log takes precedence as the most interesting log.

check_all_pipelines.yml

# Python package
# Create and test a Python package on multiple Python versions.
# Add steps that analyze code, save the dist with the build record, publish to a PyPI-compatible index, and more:
# https://docs.microsoft.com/azure/devops/pipelines/languages/python

##trigger:
##- main

trigger: none

pool:
  name: dsc-adosonar-drjohns4ServicesCoreSystems-agent
#  name: visibility_agents

#strategy:
#  matrix:
#    Python36:
#      python.version: '3.6'

steps:
#- task: UsePythonVersion@0
#  inputs:
#    versionSpec: '$(python.version)'
#  displayName: 'Use Python $(python.version)'

- script: pip3 install -vvv --timeout 60 -r Pipeline_check/requirements.txt
  displayName: 'Install requirements'

- script: python3 check_all_pipelines.py conf_check_all.ini
  displayName: 'Run script'
  workingDirectory: $(System.DefaultWorkingDirectory)/Pipeline_check
  env:
    ADO_AUTH: $(ado_auth)
    PYTHONPATH: $(System.DefaultWorkingDirectory)/Pipeline_check:$(System.DefaultWorkingDirectory)
schedules:
- cron: "19 * * * *"
  displayName: Run the script at 19 minutes after the hour
  branches:
    include:
    - main
  always: true

This yaml file we sort of drag around from pipeline to pipeline so some of it may not appear too optimized for this particular pipeline. But it does the job without fuss.

Success compels further development

This pipeline watcher has been so successful – and frankly I’m surprised this post isn’t more popular than it is because it gives away for free a great idea – that I decided what we really need is a second pipeline checker to do nothing other than to check the first pipeline_checker! Fortunately I had written it in such a general-purpose way that this proved to be easy. I created a second yaml file and a second config file. The second config file made the defaults to NOT check other pipeline, and had an explicit entry for pipeline_check to enable checking. Then I run it on other ADO agents available to me. And all is good. This was needed because the pipline_check job itself has stopped running twice in the last couple months due to no fault of my own.

The hardest part was recalling my ADO_AUTH value, which is a secret variable. For the record It is the mime-encoded string formed from

any:{PAT}

where {PAT} is my Personal Access Token (which I had recorded). On any linux server you can mime encode by running base64. Encode is the default action. So, echo any:{PAT}|base64 should produce the needed string.

Conclusion

My ADO pipeline checker is conveniently showing us all our pipeline runs which have failed for various reasons – takes too long, completed with errors, too few jobs have been run ni the last hour, … It sends its output to a MS Teams channel we have subscribed to, by way of a webhook we set up. So far it’s working great!

References and related

Here’s my post on fetching the script log resulting from the pipeline run.

Categories
Web Site Technologies

How to escape Linux commands for the WordPress editor

Intro
I develop a lot of stuff on Linux command line. Then I want to share it on my blog, which is implemented in WordPress. I only use HTML editing mode because the alternative was disastrous. But my commands were being mangled, even when they displayed OK. Put into the clipboard and pasted spit out some really strange characters and not at all what I had in my original command. What to do?

The details
An example will go a long way to show what I mean. Say I want to examine what static routes I’ve created on my server and send the results to a file. This came up a couple posts ago. So I developed the command:

$ netstat ‐rn|cut ‐c‐16|egrep ‐v ^'10\.|172|169' > /tmp/results

Now if I enter it literally in my blog with those characters it would appear like this:

$ netstat -rn|cut -c-16|egrep -v ^’10\.|172|169′ > /tmp/results

I can avoid formatting issues by using the <pre> tag, but then I can’t bold my commands. Stylistically I try to follow the style where commands typed in by the user are in boldface.

A python program solves the problem
I developed the following python program which spits out properly encoded characters that I’ve determined are at risk of being misrepresented in my blog. I call it htmlescape.

#!/usr/bin/python
# mostly lifted from https://wiki.python.org/moin/EscapingHtml
# DrJ - 7/22/16
import sys
import cgi
html_escape_table = {
    "&": "&amp;",
    '"': "&quot;",
    "'": "&apos;",
    ">": "&gt;",
    "<": "&lt;",
    "-": "&hyphen;",
    }
def html_escape(text):
    """Produce entities within text."""
    return "".join(html_escape_table.get(c,c) for c in text)
sys.stdout.write("Enter your command string: ")
code = sys.stdin.readline()
print code
print html_escape(code)

Why the command string prompt
I realized that if I allowed the shell to intervene it would mangle my single quotes, double quotes, dollar signs and a whole lot more. So I wanted to be in the context of a special shell, which python provides with its sys.stdin/stdout functions. They are perfect – they do not do any character manipulation.

A few comments about the characters

Why encode the hyphen? It comes up all the time as prefix character to command arguments. A single hyphen gets represented OK, but some commands actually require a double hyphen, ‐‐, and that gets mangled. Also, I’ve noticed that minus sign and hyphen are represented differently in HTML. The minus sign is shown to be longer and just doesn’t look right. And that is the default representation of the “-” character, even though in shell commands you almost always mean it as a hyphen, ‐.

The apostrophe is important to prevent the shell from interpolating variables inside a set of apostrophes. In the context of the shell it is more appropriately to be called a single quote or a tick mark. In HTML browsers try to be fancy and look for pairs of single quotes and turns one upside-down – rendering it as an entirely different character. Same thing for double quotes.

Strangely, the back tick ` does not suffer a similar fate. That does not get mangled so no need to represent it in encoded form. At least as far as I’ve seen. I suspect that somewhere under some circumstances it too might get mangled, but I can’t produce those conditions right now.

Example 2
$ curl –noproxy –show-error

Those are double hyphens, which is the correct syntax for using curl! But it renders as a long dash, which is bad enough, and put it in your clipboard and paste it into a shell and it produces garbage characters. At the end I put a <url> but that just became totally invisible! Running htmlescape on it makes the same look like this:

$ curl ‐‐noproxy ‐‐show‐error <url>

In my editor screen I have entered this:

$ curl &hyphen;&hyphen;noproxy &hyphen;&hyphen;show&hyphen;error &lt;url&gt;

And how did I produce that line? Why I took the previous rendering and ran it through htmlescape one more time!

Alternatives considered
I hate looking for plugins. None work exactly the way you want, they are poorly documented, fall out of support, etc. So yes there are plugins which may be able to work, but I think for my situation I like to maintain full control and go my own way.
Up until yesterday I was doing all the character substitutions by hand! That’s another alternative, but it gets tiresome.

Conclusion
A python program is presented which properly escapes Linux command line strings for suitable publication in a WordPress blog.