Various Zetta Scripts

Over the last few years of using RCS Zetta, I’ve written a few random scripts that solve little problems here and there. I thought I’d share a few of them with you today.

For many of these scripts, you will need Python 2.7, PyODBC, and a configured ODBC source pointing to your primary Zetta SQL Database.

These are unofficial scripts, and not supported in any way by myself or RCS.

Log Import Email Alerts

Zetta has a good activity logger, but doesn’t provide much in the way of realtime alerting. This script sends a summary email whenever a log is imported to any station in your Zetta system. This can be particularly useful if you want to find out when a traffic-reload happens.

"""
Zetta Log Import Emailer

This script will email you whenever a new log is imported into Zetta
Written by Anthony Eden (https://mediarealm.com.au/)
"""

# You need to download this module: https://pypi.python.org/pypi/pypyodbc

# Email config options
EMAIL_TO = ""
EMAIL_FROM = ""
EMAIL_SERVER = ""

# Zetta database ODBC connection name:
SQLConnectionString = 'DSN=ZettaDb;'





import time
import datetime
import pypyodbc

# Email libraries
import smtplib
from email.mime.text import MIMEText

# Setup Zetta SQL DB Connection:
dbConn = pypyodbc.connect(SQLConnectionString)
cur = dbConn.cursor()


def listImportTimes():
    eventsQuery = '''
    SELECT
        DISTINCT InfoCreatedTime,
        Station.StationID,
        UserName,
        CpuDescription,
        xLogger_ActivityLog.TrackerAccountID,
        Station.Name AS StationName
        
    FROM [ZettaDB].[dbo].[xLogger_ActivityLog],
        [ZettaDB].[dbo].[xLogger_TrackerAccount],
        [ZettaDB].[dbo].[Station]
        
        WHERE ActivityLocationID = 103
        AND xLogger_ActivityLog.TrackerAccountID = xLogger_TrackerAccount.TrackerAccountId
        AND xLogger_ActivityLog.StationID = Station.StationID
        AND InfoCreatedTime >= DATEADD(MINUTE, -10, GETUTCDATE())
        ORDER BY InfoCreatedTime DESC
    '''
    
    cur.execute(eventsQuery)
    events = []
    
    for d in cur:

        if d[2] == "":
                UserName = "SYSTEM USER"
        else:
                UserName = d[2]
        
        events.append({
                "InfoCreatedTime": d[0],
                "StationID": d[1],
                "StationName": d[5],
                "UserName": UserName,
                "CpuDescription": d[3],
                "TrackerAccountID": d[4]
        })
    
    return events

def listImportDate(eventTime, accountId):
    assetTypesQuery = '''
    SELECT TOP 1 Changes
        FROM [ZettaDB].[dbo].[xLogger_ActivityLog]
        WHERE ActivityLocationID = 103
        AND InfoCreatedTime = ?
        AND TrackerAccountID = ?
    '''
    
    cur.execute(assetTypesQuery, [eventTime, int(accountId)])

    for d in cur:
        dateString = d[0].split(" Hour ")
        return dateString[0].split("log ")[1]
    

def listImportAssetTypes(eventTime, accountId):
    assetTypesQuery = '''
    SELECT Changes
        FROM [ZettaDB].[dbo].[xLogger_ActivityLog]
        WHERE ActivityLocationID = 103
        AND InfoCreatedTime = ?
        AND TrackerAccountID = ?
    '''
    
    cur.execute(assetTypesQuery, [eventTime, int(accountId)])

    # Store a count of every asset type found in the new log
    assetTypes = {
        "Other/Unknown": 0
    }

    assetTypesMatch = {
        "Song": "Play Song: ",
        "Link": "Play Link: ",
        "Spot": "Play Spot: ",
        "Exact Time Marker": "Exact Time Marker: ",
        "Unresolved Asset": "Unresolved Asset: ",
        "Top of Hour": "Top of Hour: ",
        "Spot Block": "Spot Block: ",
        "Macro": "Macro: ",
        "Empty Voice Track Slot": "Empty Voice Track Slot: ",
        "Comment": "Comment: ",
    }

    for assetType in assetTypesMatch:
        assetTypes[assetType] = 0
    
    for d in cur:
        foundMatch = False
        for assetType in assetTypesMatch:
                if assetTypesMatch[assetType] in d[0]:
                        assetTypes[assetType] += 1
                        foundMatch = True
                        break
        
        if foundMatch is False:
            print d[0]
            assetTypes['Other/Unknown'] += 1
    
    return assetTypes

def sendEmail(to, subject, body):
    msg = MIMEText(body)
    
    msg['Subject'] = subject
    msg['From'] = EMAIL_FROM
    msg['To'] = to
    
    s = smtplib.SMTP(EMAIL_SERVER)
    s.sendmail(EMAIL_FROM, [to], msg.as_string())
    s.quit()

if __name__ == "__main__":

    notifiedImportTimes = []
    
    while True:
        try:
            for event in listImportTimes():
                    if event['InfoCreatedTime'] not in notifiedImportTimes:
                        notifiedImportTimes.append(event['InfoCreatedTime'])
                        spotsOnly = True

                        emailSubject = "Log Imported: " + event['StationName']
                        emailBody = "A log was imported by " + event['UserName']
                        emailBody += " to station " + event['StationName']
                        emailBody += " on computer " + event['CpuDescription'] + "\r\n\r\n"

                        emailBody += "Log Date: " + listImportDate(event['InfoCreatedTime'], event['TrackerAccountID'])  + "\r\n\r\n"

                        emailBody += "Here's a breakdown of what was imported:\r\n"
                        assetBreakdown = listImportAssetTypes(event['InfoCreatedTime'], event['TrackerAccountID'])

                        for assetType in assetBreakdown:
                            if assetType != "Spot" and assetBreakdown[assetType] > 0:
                                spotsOnly = False
                            
                            emailBody += assetType + ": " + str(assetBreakdown[assetType]) + "\r\n"

                        emailBody += "\r\nImport Time: " + str(event['InfoCreatedTime']) + "\r\n"

                        if spotsOnly is True:
                            emailSubject += " (Spots Only)"
                        
                        print emailSubject
                        print emailBody

                        sendEmail(EMAIL_TO, emailSubject, emailBody)
                        
                        #print event
                        print 
            
            time.sleep(20)
            
        except Exception, e:
            print "Exception!!!", e
            time.sleep(120)


Add Timestamps to Background Recorder Asset Titles

Zetta allows you to record audio files in the background and override assets – useful for capturing satellite and codec feeds. However, there’s no quick way for on-air talent to see when an asset was last updated. This script checks all background-recorded files in the system, and appends a timestamp to the filename.

"""
Zetta Background Recorder Timestamper
This script appends the last-modified timestamp string to assets that are updated by the background recorder.

Written by Anthony Eden (https://mediarealm.com.au)
"""

import pypyodbc
import time
import datetime

# Setup Zetta SQL DB Connection:
SQLConnectionString = 'DSN=ZettaDB;'
dbConn = pypyodbc.connect(SQLConnectionString)
cur = dbConn.cursor()

def utc2local(utc):
    # From https://stackoverflow.com/a/19238551
    epoch = time.mktime(utc.timetuple())
    offset = datetime.datetime.fromtimestamp(epoch) - datetime.datetime.utcfromtimestamp(epoch)
    return utc + offset

def listBackgroundRecorderAssets():
	assetsQuery = '''
        SELECT
                RecordEvent.AssetID,
                Asset.infoModifiedDate,
                Asset.Title,
                [Resource].infoModifiedDate
        FROM
                [RecordEvent],
                [Asset],
                [AssetToResource],
                [Resource]
        WHERE RefId IS NULL
        AND RecordEvent.AssetID IS NOT NULL
        AND RecordEvent.AssetID = Asset.AssetID
        AND AssetToResource.AssetID = Asset.AssetID
        AND AssetToResource.IsPrimaryResource = 1
        AND AssetToResource.ResourceID = [Resource].ResourceID
	'''
	
	cur.execute(assetsQuery)
	assets = []

	rows = cur.fetchall()
	for d in rows:
		localTime = utc2local(d[3])
		assets.append({
			"AssetID": d[0],
			"Title": d[2],
      "LastModified": d[3],
			"LastModifiedLocal": localTime,
      "AppendDateString": "[" + localTime.strftime("%d/%m/%Y %I:%M:%S %p") + "]"
		})
	
	return assets

def updateAssetTitle(assetId, title):
  # Sets a new title for an asset
	assetQuery = ''' UPDATE TOP(1) Asset SET Title = ? WHERE AssetID = ? '''
	cur.execute(assetQuery, (title, assetId))
	dbConn.commit()


if __name__ == "__main__":
	
	while True:
		try:
      # Loop over all background recorder assets
			for x in listBackgroundRecorderAssets():
      
        # Does this asset already have square braces in the title?
				if "[" in x['Title'] and "]" in x['Title'][x['Title'].index("["):]:
          # Replace existing square braces with datetime stamp
					newTitle = x['Title'][:x['Title'].index("[")] + x['AppendDateString'] + x['Title'][x['Title'].index("]", x['Title'].index("[")) + 1:]

					if newTitle != x['Title']:
						print "Update title", newTitle, "for Asset ID", x['AssetID']
						updateAssetTitle(x['AssetID'], newTitle)
				else:
          # Append the datetime stamp to the existing title
					newTitle = x['Title'] + " " + x['AppendDateString']
					print "New title", newTitle, "for Asset ID", x['AssetID']
					updateAssetTitle(x['AssetID'], newTitle)
	
			time.sleep(20)
		except Exception, e:
			print "Exception!!!", e
			time.sleep(120)


Batch Export Assets Based On External ID

Let’s say you need to export a lot of assets from Zetta, and you have a list of their External IDs (perhaps you got this list from an external music scheduling program, or a reconciliation file). How can you export all these files to a folder without locating the assets manually in the Library?

Simply paste your list of External IDs into the CONFIG_AssetList variable in this script, specify the source and destination directories, and run it.

"""
Zetta Batch Audio Exporter
Given a list of External IDs, find each asset's file in the Zetta Content Store, and copy it to a separate folder.

Written by Anthony Eden (https://mediarealm.com.au)
"""

from shutil import copyfile
import os
import pypyodbc
import re


# The Zetta Content Store location:
CONFIG_StorageLocation = "\\\\zettaaudio1\\ZAUDIO\\"

# The output location:
CONFIG_OutputStorage = "C:\\SCRIPTS\\ZettaBatchExport\\"

# Paste a list of External IDs here (one per line)
CONFIG_AssetList = """
12345
67890
"""





# Setup Zetta SQL DB Connection:
SQLConnectionString = 'DSN=ZettaDB;'
dbConn = pypyodbc.connect(SQLConnectionString)
cur = dbConn.cursor()

def findAssetByExternalID(ExternalID):
    assetQuery = '''
    SELECT
        Asset.ThirdPartyID,
        Asset.Title,
        Resource.StorageFile,
        AutoGen_AssetReadonlyReferences.ArtistID_1,
        Resource.ResourceID
	FROM
      Asset,
	    Resource,
	    AssetToResource,
	    AutoGen_AssetReadonlyReferences
	WHERE
        Asset.AssetID = AssetToResource.AssetID
        AND AssetToResource.isPrimaryResource = 1
        AND AssetToResource.ResourceID = Resource.ResourceID
        AND Asset.AssetID = AutoGen_AssetReadonlyReferences.AssetID
        AND ThirdPartyID = ?
    '''
	
    cur.execute(assetQuery, [str(ExternalID)])
	
    assetList = []
	
    for d in cur:
        assetList.append({
            "ExternalID": str(d[0]),
            "Filename": d[2],
            "Title": str(d[1]),
            "Artist": getArtistName(d[3]),
            "FileFolder": getStorageFolderFromFilename(d[4]),
            "FileExtension": getExtensionFromFilename(d[2]),
            "ResourceID": str(d[4])
        })
	
    return assetList

def getArtistName(artistId):
    # Take the Artist ID and return the Artist's name

    try:
        artistId = int(artistId)
    except:
        return None

    artistNameQuery = '''
    SELECT Artist.Name
    FROM Artist
    WHERE ArtistID = ?'''
    
    cur.execute(artistNameQuery, [artistId])
    
    for d in cur:
        return d[0]
    
    return ""

def getStorageFolderFromFilename(filename):
    # Zetta stores 1000 items in each folder (e.g. from 1000 to 1999)
    # This is based on the ResourceID
    
    folder = str(filename).split("~")[0][:-3]

    if folder == "":
        folder = 0

    return int(folder)

def getExtensionFromFilename(fullname):
    # Returns the file's extension name

    filename, file_extension = os.path.splitext(fullname)
    return file_extension


if __name__ == "__main__":

    IDs = CONFIG_AssetList.split()

    for thisExternalID in IDs:
        # Loop over every specified External ID
        
        asset = findAssetByExternalID(thisExternalID)[0]

        assetTitle = re.sub(r'\W+', ' ', asset['Title']).replace("  ", " ").strip()
        assetArtist = re.sub(r'\W+', ' ', asset['Artist']).replace("  ", " ").strip()
        
        srcfile = CONFIG_StorageLocation + str(asset['FileFolder']) + "\\" + asset['Filename']
        dstfile = CONFIG_OutputStorage + assetArtist + " - " + assetTitle + asset['FileExtension']

    
        if os.path.isfile(dstfile):
            print "DST FILE ALREADY EXISTS: ", dstfile
        elif os.path.isfile(srcfile):
            # Do the copy
            print asset['ExternalID'], "||", dstfile
            copyfile(srcfile, dstfile)
        else:
            print "SRC FILE DOESN'T EXIST: ", srcfile
        
        

ZettaView: HTML Status Feed Viewer

Update August 2018: ZettaView is now available for download. Check it out on GitHub.

ZettaView is a simple Zetta Status Feed client, which allows you to view the current and upcoming assets for a specific station in a web browser. This can be useful for outside broadcasts, TV displays around the building, or simple remote troubleshooting.

ZettaView is a Python script, with an in-built Flask web server. For convenience, I’ve bundled this script as an EXE.

Simply download the EXE, rename ‘config-sample.json’ to ‘config.json’, put you Zetta Status Feed Server’s address into the config file, run the EXE, and then browse to http://127.0.0.1:4444.

Please note – this is a simple script, which shouldn’t be exposed to the public internet. Please do not port forward to this script.

Get the Broadcast Technology Newsletter

Sign up for the email newsletter about media and technology. Sent irregularly. No spam.

I'm Anthony Eden, and I'm a IT Professional, Broadcast Technician, Software Developer, and Solutions Engineer. I've been working in broadcast media since 2008, and developing software and websites for just as long. Right now, I provide freelance services through Media Realm - in particular, to the media and not-for-profit industries.

Follow Anthony on Twitter: @anthony_eden