Various Zetta Scripts

Over the last few years of using RCS Zetta, I’ve written a few random scripts that solve little problems here and there. I thought I’d share a few of them with you today.

For many of these scripts, you will need Python 2.7, PyODBC, and a configured ODBC source pointing to your primary Zetta SQL Database.

These are unofficial scripts, and not supported in any way by myself or RCS.

Log Import Email Alerts

Zetta has a good activity logger, but doesn’t provide much in the way of realtime alerting. This script sends a summary email whenever a log is imported to any station in your Zetta system. This can be particularly useful if you want to find out when a traffic-reload happens.

Zetta Log Import Emailer

This script will email you whenever a new log is imported into Zetta
Written by Anthony Eden (

# You need to download this module:

# Email config options

# Zetta database ODBC connection name:
SQLConnectionString = 'DSN=ZettaDb;'

import time
import datetime
import pypyodbc

# Email libraries
import smtplib
from email.mime.text import MIMEText

# Setup Zetta SQL DB Connection:
dbConn = pypyodbc.connect(SQLConnectionString)
cur = dbConn.cursor()

def listImportTimes():
    eventsQuery = '''
        DISTINCT InfoCreatedTime,
        Station.Name AS StationName
    FROM [ZettaDB].[dbo].[xLogger_ActivityLog],
        WHERE ActivityLocationID = 103
        AND xLogger_ActivityLog.TrackerAccountID = xLogger_TrackerAccount.TrackerAccountId
        AND xLogger_ActivityLog.StationID = Station.StationID
        AND InfoCreatedTime >= DATEADD(MINUTE, -10, GETUTCDATE())
        ORDER BY InfoCreatedTime DESC
    events = []
    for d in cur:

        if d[2] == "":
                UserName = "SYSTEM USER"
                UserName = d[2]
                "InfoCreatedTime": d[0],
                "StationID": d[1],
                "StationName": d[5],
                "UserName": UserName,
                "CpuDescription": d[3],
                "TrackerAccountID": d[4]
    return events

def listImportDate(eventTime, accountId):
    assetTypesQuery = '''
    SELECT TOP 1 Changes
        FROM [ZettaDB].[dbo].[xLogger_ActivityLog]
        WHERE ActivityLocationID = 103
        AND InfoCreatedTime = ?
        AND TrackerAccountID = ?
    cur.execute(assetTypesQuery, [eventTime, int(accountId)])

    for d in cur:
        dateString = d[0].split(" Hour ")
        return dateString[0].split("log ")[1]

def listImportAssetTypes(eventTime, accountId):
    assetTypesQuery = '''
    SELECT Changes
        FROM [ZettaDB].[dbo].[xLogger_ActivityLog]
        WHERE ActivityLocationID = 103
        AND InfoCreatedTime = ?
        AND TrackerAccountID = ?
    cur.execute(assetTypesQuery, [eventTime, int(accountId)])

    # Store a count of every asset type found in the new log
    assetTypes = {
        "Other/Unknown": 0

    assetTypesMatch = {
        "Song": "Play Song: ",
        "Link": "Play Link: ",
        "Spot": "Play Spot: ",
        "Exact Time Marker": "Exact Time Marker: ",
        "Unresolved Asset": "Unresolved Asset: ",
        "Top of Hour": "Top of Hour: ",
        "Spot Block": "Spot Block: ",
        "Macro": "Macro: ",
        "Empty Voice Track Slot": "Empty Voice Track Slot: ",
        "Comment": "Comment: ",

    for assetType in assetTypesMatch:
        assetTypes[assetType] = 0
    for d in cur:
        foundMatch = False
        for assetType in assetTypesMatch:
                if assetTypesMatch[assetType] in d[0]:
                        assetTypes[assetType] += 1
                        foundMatch = True
        if foundMatch is False:
            print d[0]
            assetTypes['Other/Unknown'] += 1
    return assetTypes

def sendEmail(to, subject, body):
    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = EMAIL_FROM
    msg['To'] = to
    s = smtplib.SMTP(EMAIL_SERVER)
    s.sendmail(EMAIL_FROM, [to], msg.as_string())

if __name__ == "__main__":

    notifiedImportTimes = []
    while True:
            for event in listImportTimes():
                    if event['InfoCreatedTime'] not in notifiedImportTimes:
                        spotsOnly = True

                        emailSubject = "Log Imported: " + event['StationName']
                        emailBody = "A log was imported by " + event['UserName']
                        emailBody += " to station " + event['StationName']
                        emailBody += " on computer " + event['CpuDescription'] + "\r\n\r\n"

                        emailBody += "Log Date: " + listImportDate(event['InfoCreatedTime'], event['TrackerAccountID'])  + "\r\n\r\n"

                        emailBody += "Here's a breakdown of what was imported:\r\n"
                        assetBreakdown = listImportAssetTypes(event['InfoCreatedTime'], event['TrackerAccountID'])

                        for assetType in assetBreakdown:
                            if assetType != "Spot" and assetBreakdown[assetType] > 0:
                                spotsOnly = False
                            emailBody += assetType + ": " + str(assetBreakdown[assetType]) + "\r\n"

                        emailBody += "\r\nImport Time: " + str(event['InfoCreatedTime']) + "\r\n"

                        if spotsOnly is True:
                            emailSubject += " (Spots Only)"
                        print emailSubject
                        print emailBody

                        sendEmail(EMAIL_TO, emailSubject, emailBody)
                        #print event
        except Exception, e:
            print "Exception!!!", e

Add Timestamps to Background Recorder Asset Titles

Zetta allows you to record audio files in the background and override assets – useful for capturing satellite and codec feeds. However, there’s no quick way for on-air talent to see when an asset was last updated. This script checks all background-recorded files in the system, and appends a timestamp to the filename.

Zetta Background Recorder Timestamper
This script appends the last-modified timestamp string to assets that are updated by the background recorder.

Written by Anthony Eden (

import pypyodbc
import time
import datetime

# Setup Zetta SQL DB Connection:
SQLConnectionString = 'DSN=ZettaDB;'
dbConn = pypyodbc.connect(SQLConnectionString)
cur = dbConn.cursor()

def utc2local(utc):
    # From
    epoch = time.mktime(utc.timetuple())
    offset = datetime.datetime.fromtimestamp(epoch) - datetime.datetime.utcfromtimestamp(epoch)
    return utc + offset

def listBackgroundRecorderAssets():
	assetsQuery = '''
        WHERE RefId IS NULL
        AND RecordEvent.AssetID IS NOT NULL
        AND RecordEvent.AssetID = Asset.AssetID
        AND AssetToResource.AssetID = Asset.AssetID
        AND AssetToResource.IsPrimaryResource = 1
        AND AssetToResource.ResourceID = [Resource].ResourceID
	assets = []

	rows = cur.fetchall()
	for d in rows:
		localTime = utc2local(d[3])
			"AssetID": d[0],
			"Title": d[2],
      "LastModified": d[3],
			"LastModifiedLocal": localTime,
      "AppendDateString": "[" + localTime.strftime("%d/%m/%Y %I:%M:%S %p") + "]"
	return assets

def updateAssetTitle(assetId, title):
  # Sets a new title for an asset
	assetQuery = ''' UPDATE TOP(1) Asset SET Title = ? WHERE AssetID = ? '''
	cur.execute(assetQuery, (title, assetId))

if __name__ == "__main__":
	while True:
      # Loop over all background recorder assets
			for x in listBackgroundRecorderAssets():
        # Does this asset already have square braces in the title?
				if "[" in x['Title'] and "]" in x['Title'][x['Title'].index("["):]:
          # Replace existing square braces with datetime stamp
					newTitle = x['Title'][:x['Title'].index("[")] + x['AppendDateString'] + x['Title'][x['Title'].index("]", x['Title'].index("[")) + 1:]

					if newTitle != x['Title']:
						print "Update title", newTitle, "for Asset ID", x['AssetID']
						updateAssetTitle(x['AssetID'], newTitle)
          # Append the datetime stamp to the existing title
					newTitle = x['Title'] + " " + x['AppendDateString']
					print "New title", newTitle, "for Asset ID", x['AssetID']
					updateAssetTitle(x['AssetID'], newTitle)
		except Exception, e:
			print "Exception!!!", e

Batch Export Assets Based On External ID

Let’s say you need to export a lot of assets from Zetta, and you have a list of their External IDs (perhaps you got this list from an external music scheduling program, or a reconciliation file). How can you export all these files to a folder without locating the assets manually in the Library?

Simply paste your list of External IDs into the CONFIG_AssetList variable in this script, specify the source and destination directories, and run it.

Zetta Batch Audio Exporter
Given a list of External IDs, find each asset's file in the Zetta Content Store, and copy it to a separate folder.

Written by Anthony Eden (

from shutil import copyfile
import os
import pypyodbc
import re

# The Zetta Content Store location:
CONFIG_StorageLocation = "\\\\zettaaudio1\\ZAUDIO\\"

# The output location:
CONFIG_OutputStorage = "C:\\SCRIPTS\\ZettaBatchExport\\"

# Paste a list of External IDs here (one per line)
CONFIG_AssetList = """

# Setup Zetta SQL DB Connection:
SQLConnectionString = 'DSN=ZettaDB;'
dbConn = pypyodbc.connect(SQLConnectionString)
cur = dbConn.cursor()

def findAssetByExternalID(ExternalID):
    assetQuery = '''
        Asset.AssetID = AssetToResource.AssetID
        AND AssetToResource.isPrimaryResource = 1
        AND AssetToResource.ResourceID = Resource.ResourceID
        AND Asset.AssetID = AutoGen_AssetReadonlyReferences.AssetID
        AND ThirdPartyID = ?
    cur.execute(assetQuery, [str(ExternalID)])
    assetList = []
    for d in cur:
            "ExternalID": str(d[0]),
            "Filename": d[2],
            "Title": str(d[1]),
            "Artist": getArtistName(d[3]),
            "FileFolder": getStorageFolderFromFilename(d[4]),
            "FileExtension": getExtensionFromFilename(d[2]),
            "ResourceID": str(d[4])
    return assetList

def getArtistName(artistId):
    # Take the Artist ID and return the Artist's name

        artistId = int(artistId)
        return None

    artistNameQuery = '''
    SELECT Artist.Name
    FROM Artist
    WHERE ArtistID = ?'''
    cur.execute(artistNameQuery, [artistId])
    for d in cur:
        return d[0]
    return ""

def getStorageFolderFromFilename(filename):
    # Zetta stores 1000 items in each folder (e.g. from 1000 to 1999)
    # This is based on the ResourceID
    folder = str(filename).split("~")[0][:-3]

    if folder == "":
        folder = 0

    return int(folder)

def getExtensionFromFilename(fullname):
    # Returns the file's extension name

    filename, file_extension = os.path.splitext(fullname)
    return file_extension

if __name__ == "__main__":

    IDs = CONFIG_AssetList.split()

    for thisExternalID in IDs:
        # Loop over every specified External ID
        asset = findAssetByExternalID(thisExternalID)[0]

        assetTitle = re.sub(r'\W+', ' ', asset['Title']).replace("  ", " ").strip()
        assetArtist = re.sub(r'\W+', ' ', asset['Artist']).replace("  ", " ").strip()
        srcfile = CONFIG_StorageLocation + str(asset['FileFolder']) + "\\" + asset['Filename']
        dstfile = CONFIG_OutputStorage + assetArtist + " - " + assetTitle + asset['FileExtension']

        if os.path.isfile(dstfile):
            print "DST FILE ALREADY EXISTS: ", dstfile
        elif os.path.isfile(srcfile):
            # Do the copy
            print asset['ExternalID'], "||", dstfile
            copyfile(srcfile, dstfile)
            print "SRC FILE DOESN'T EXIST: ", srcfile

ZettaView: HTML Status Feed Viewer

Update August 2018: ZettaView is now available for download. Check it out on GitHub.

ZettaView is a simple Zetta Status Feed client, which allows you to view the current and upcoming assets for a specific station in a web browser. This can be useful for outside broadcasts, TV displays around the building, or simple remote troubleshooting.

ZettaView is a Python script, with an in-built Flask web server. For convenience, I’ve bundled this script as an EXE.

Simply download the EXE, rename ‘config-sample.json’ to ‘config.json’, put you Zetta Status Feed Server’s address into the config file, run the EXE, and then browse to

Please note – this is a simple script, which shouldn’t be exposed to the public internet. Please do not port forward to this script.

