RCS Zetta allows you to load music and traffic logs via the ASCIICON file format. I’ve created a Python script to monitor Zetta for any new log file imports, and email you a summary. This is particularly useful to monitor late-changes to traffic, so someone can check the timing of automated shifts.

The emails look like this:

The email has been designed to answer a few key questions, including which station, the log date, who performed the import, and what the breakdown of assets is. This allows the recipient (generally a music or content director) to quickly see what’s happened and take action if necessary.

There is also a variant of this email which sends if ONLY Spots are imported. This is so we can track traffic log loads separately.

Download the Zetta Log Load Email Script

This script has been written for Python 2.7, and RCS Zetta 4.1. It may work on earlier and later versions of Zetta.

Here’s the script:

"""
Zetta Log Import Emailer

This script will email you whenever a new log is imported into Zetta
Written by Anthony Eden (https://mediarealm.com.au/)
"""

# You need to download this module: https://pypi.python.org/pypi/pypyodbc

# Email config options
EMAIL_TO = ""
EMAIL_FROM = ""
EMAIL_SERVER = ""

# Zetta database ODBC connection name:
SQLConnectionString = 'DSN=ZettaDb;'





import time
import datetime
import pypyodbc

# Email libraries
import smtplib
from email.mime.text import MIMEText

# Setup Zetta SQL DB Connection:
dbConn = pypyodbc.connect(SQLConnectionString)
cur = dbConn.cursor()


def listImportTimes():
    eventsQuery = '''
    SELECT
        DISTINCT InfoCreatedTime,
        Station.StationID,
        UserName,
        CpuDescription,
        xLogger_ActivityLog.TrackerAccountID,
        Station.Name AS StationName
        
    FROM [ZettaDB].[dbo].[xLogger_ActivityLog],
        [ZettaDB].[dbo].[xLogger_TrackerAccount],
        [ZettaDB].[dbo].[Station]
        
        WHERE ActivityLocationID = 103
        AND xLogger_ActivityLog.TrackerAccountID = xLogger_TrackerAccount.TrackerAccountId
        AND xLogger_ActivityLog.StationID = Station.StationID
        AND InfoCreatedTime >= DATEADD(MINUTE, -10, GETUTCDATE())
        ORDER BY InfoCreatedTime DESC
    '''
    
    cur.execute(eventsQuery)
    events = []
    
    for d in cur:

        if d[2] == "":
                UserName = "SYSTEM USER"
        else:
                UserName = d[2]
        
        events.append({
                "InfoCreatedTime": d[0],
                "StationID": d[1],
                "StationName": d[5],
                "UserName": UserName,
                "CpuDescription": d[3],
                "TrackerAccountID": d[4]
        })
    
    return events

def listImportDate(eventTime, accountId):
    assetTypesQuery = '''
    SELECT TOP 1 Changes
        FROM [ZettaDB].[dbo].[xLogger_ActivityLog]
        WHERE ActivityLocationID = 103
        AND InfoCreatedTime = ?
        AND TrackerAccountID = ?
    '''
    
    cur.execute(assetTypesQuery, [eventTime, int(accountId)])

    for d in cur:
        dateString = d[0].split(" Hour ")
        return dateString[0].split("log ")[1]
    

def listImportAssetTypes(eventTime, accountId):
    assetTypesQuery = '''
    SELECT Changes
        FROM [ZettaDB].[dbo].[xLogger_ActivityLog]
        WHERE ActivityLocationID = 103
        AND InfoCreatedTime = ?
        AND TrackerAccountID = ?
    '''
    
    cur.execute(assetTypesQuery, [eventTime, int(accountId)])

    # Store a count of every asset type found in the new log
    assetTypes = {
        "Other/Unknown": 0
    }

    assetTypesMatch = {
        "Song": "Play Song: ",
        "Link": "Play Link: ",
        "Spot": "Play Spot: ",
        "Exact Time Marker": "Exact Time Marker: ",
        "Unresolved Asset": "Unresolved Asset: ",
        "Top of Hour": "Top of Hour: ",
        "Spot Block": "Spot Block: ",
        "Macro": "Macro: ",
        "Empty Voice Track Slot": "Empty Voice Track Slot: ",
        "Comment": "Comment: ",
    }

    for assetType in assetTypesMatch:
        assetTypes[assetType] = 0
    
    for d in cur:
        foundMatch = False
        for assetType in assetTypesMatch:
                if assetTypesMatch[assetType] in d[0]:
                        assetTypes[assetType] += 1
                        foundMatch = True
                        break
        
        if foundMatch is False:
            print d[0]
            assetTypes['Other/Unknown'] += 1
    
    return assetTypes

def sendEmail(to, subject, body):
    msg = MIMEText(body)
    
    msg['Subject'] = subject
    msg['From'] = EMAIL_FROM
    msg['To'] = to
    
    s = smtplib.SMTP(EMAIL_SERVER)
    s.sendmail(EMAIL_FROM, [to], msg.as_string())
    s.quit()

if __name__ == "__main__":

    notifiedImportTimes = []
    
    while True:
        try:
            for event in listImportTimes():
                    if event['InfoCreatedTime'] not in notifiedImportTimes:
                        notifiedImportTimes.append(event['InfoCreatedTime'])
                        spotsOnly = True

                        emailSubject = "Log Imported: " + event['StationName']
                        emailBody = "A log was imported by " + event['UserName']
                        emailBody += " to station " + event['StationName']
                        emailBody += " on computer " + event['CpuDescription'] + "\r\n\r\n"

                        emailBody += "Log Date: " + listImportDate(event['InfoCreatedTime'], event['TrackerAccountID'])  + "\r\n\r\n"

                        emailBody += "Here's a breakdown of what was imported:\r\n"
                        assetBreakdown = listImportAssetTypes(event['InfoCreatedTime'], event['TrackerAccountID'])

                        for assetType in assetBreakdown:
                            if assetType != "Spot" and assetBreakdown[assetType] > 0:
                                spotsOnly = False
                            
                            emailBody += assetType + ": " + str(assetBreakdown[assetType]) + "\r\n"

                        emailBody += "\r\nImport Time: " + str(event['InfoCreatedTime']) + "\r\n"

                        if spotsOnly is True:
                            emailSubject += " (Spots Only)"
                        
                        print emailSubject
                        print emailBody

                        sendEmail(EMAIL_TO, emailSubject, emailBody)
                        
                        #print event
                        print 
            
            time.sleep(20)
            
        except Exception, e:
            print "Exception!!!", e
            time.sleep(120)


 

To run this script, you must also download the PyPyODBC module, and have access to a SMTP server.

There are four configuration variables to be filled out at the top of the script:

The TO field is the email address of a recipient, or distribution list.

The FROM field is any email address to be seen as sending the email (it doesn’t necessarily have to exist).

The SERVER field is the DNS name or IP Address of your SMTP server. This script doesn’t use SMTP Authentication, so it’s best to point this at either the direct IP of your internal mail server, or perhaps at your ISP’s open relay.

The SQL Connection String is for a ODBC connection. In Windows, you need to open the ODBC Data Source Administrator (32-bit), and create a 32-bit Microsoft SQL Server DSN (either User or System) to your main Zetta Database Server. Ensure you specify credentials, and a default database. Enter the name you select of it in the DSN String for this Script.

When you run the script, it will automatically check for new log imports every 20 seconds. Leave it running on a Utility Server, and you’ll receive automatic emails all day long. It will run for any station in your Zetta Database.

This script has not been tested with GSelector Integrated sites. It may need some modifications to work with GSelector.

Get the Broadcast Technology Newsletter

Sign up for the email newsletter about media and technology. Sent irregularly. No spam.

I'm Anthony Eden, and I'm a IT Professional, Broadcast Technician, Software Developer, and Solutions Engineer. I've been working in broadcast media since 2008, and developing software and websites for just as long. Right now, I provide freelance services through Media Realm - in particular, to the media and not-for-profit industries.

Follow Anthony on Twitter: @anthony_eden