Over the last few years of using RCS Zetta , I’ve written a few random scripts that solve little problems here and there. I thought I’d share a few of them with you today.
For many of these scripts, you will need Python 2.7, PyODBC , and a configured ODBC source pointing to your primary Zetta SQL Database.
These are unofficial scripts, and not supported in any way by myself or RCS.
Log Import Email Alerts
Zetta has a good activity logger, but doesn’t provide much in the way of realtime alerting. This script sends a summary email whenever a log is imported to any station in your Zetta system. This can be particularly useful if you want to find out when a traffic-reload happens.
"""
Zetta Log Import Emailer
This script will email you whenever a new log is imported into Zetta
Written by Anthony Eden (https://mediarealm.com.au/)
"""
# You need to download this module: https://pypi.python.org/pypi/pypyodbc
# Email config options
EMAIL_TO = ""
EMAIL_FROM = ""
EMAIL_SERVER = ""
# Zetta database ODBC connection name:
SQLConnectionString = 'DSN=ZettaDb;'
import time
import datetime
import pypyodbc
# Email libraries
import smtplib
from email.mime.text import MIMEText
# Setup Zetta SQL DB Connection:
dbConn = pypyodbc.connect(SQLConnectionString)
cur = dbConn.cursor()
def listImportTimes():
eventsQuery = '''
SELECT
DISTINCT InfoCreatedTime,
Station.StationID,
UserName,
CpuDescription,
xLogger_ActivityLog.TrackerAccountID,
Station.Name AS StationName
FROM [ZettaDB].[dbo].[xLogger_ActivityLog],
[ZettaDB].[dbo].[xLogger_TrackerAccount],
[ZettaDB].[dbo].[Station]
WHERE ActivityLocationID = 103
AND xLogger_ActivityLog.TrackerAccountID = xLogger_TrackerAccount.TrackerAccountId
AND xLogger_ActivityLog.StationID = Station.StationID
AND InfoCreatedTime >= DATEADD(MINUTE, -10, GETUTCDATE())
ORDER BY InfoCreatedTime DESC
'''
cur.execute(eventsQuery)
events = []
for d in cur:
if d[2] == "":
UserName = "SYSTEM USER"
else:
UserName = d[2]
events.append({
"InfoCreatedTime": d[0],
"StationID": d[1],
"StationName": d[5],
"UserName": UserName,
"CpuDescription": d[3],
"TrackerAccountID": d[4]
})
return events
def listImportDate(eventTime, accountId):
assetTypesQuery = '''
SELECT TOP 1 Changes
FROM [ZettaDB].[dbo].[xLogger_ActivityLog]
WHERE ActivityLocationID = 103
AND InfoCreatedTime = ?
AND TrackerAccountID = ?
'''
cur.execute(assetTypesQuery, [eventTime, int(accountId)])
for d in cur:
dateString = d[0].split(" Hour ")
return dateString[0].split("log ")[1]
def listImportAssetTypes(eventTime, accountId):
assetTypesQuery = '''
SELECT Changes
FROM [ZettaDB].[dbo].[xLogger_ActivityLog]
WHERE ActivityLocationID = 103
AND InfoCreatedTime = ?
AND TrackerAccountID = ?
'''
cur.execute(assetTypesQuery, [eventTime, int(accountId)])
# Store a count of every asset type found in the new log
assetTypes = {
"Other/Unknown": 0
}
assetTypesMatch = {
"Song": "Play Song: ",
"Link": "Play Link: ",
"Spot": "Play Spot: ",
"Exact Time Marker": "Exact Time Marker: ",
"Unresolved Asset": "Unresolved Asset: ",
"Top of Hour": "Top of Hour: ",
"Spot Block": "Spot Block: ",
"Macro": "Macro: ",
"Empty Voice Track Slot": "Empty Voice Track Slot: ",
"Comment": "Comment: ",
}
for assetType in assetTypesMatch:
assetTypes[assetType] = 0
for d in cur:
foundMatch = False
for assetType in assetTypesMatch:
if assetTypesMatch[assetType] in d[0]:
assetTypes[assetType] += 1
foundMatch = True
break
if foundMatch is False:
print d[0]
assetTypes['Other/Unknown'] += 1
return assetTypes
def sendEmail(to, subject, body):
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = EMAIL_FROM
msg['To'] = to
s = smtplib.SMTP(EMAIL_SERVER)
s.sendmail(EMAIL_FROM, [to], msg.as_string())
s.quit()
if __name__ == "__main__":
notifiedImportTimes = []
while True:
try:
for event in listImportTimes():
if event['InfoCreatedTime'] not in notifiedImportTimes:
notifiedImportTimes.append(event['InfoCreatedTime'])
spotsOnly = True
emailSubject = "Log Imported: " + event['StationName']
emailBody = "A log was imported by " + event['UserName']
emailBody += " to station " + event['StationName']
emailBody += " on computer " + event['CpuDescription'] + "\r\n\r\n"
emailBody += "Log Date: " + listImportDate(event['InfoCreatedTime'], event['TrackerAccountID']) + "\r\n\r\n"
emailBody += "Here's a breakdown of what was imported:\r\n"
assetBreakdown = listImportAssetTypes(event['InfoCreatedTime'], event['TrackerAccountID'])
for assetType in assetBreakdown:
if assetType != "Spot" and assetBreakdown[assetType] > 0:
spotsOnly = False
emailBody += assetType + ": " + str(assetBreakdown[assetType]) + "\r\n"
emailBody += "\r\nImport Time: " + str(event['InfoCreatedTime']) + "\r\n"
if spotsOnly is True:
emailSubject += " (Spots Only)"
print emailSubject
print emailBody
sendEmail(EMAIL_TO, emailSubject, emailBody)
#print event
print
time.sleep(20)
except Exception, e:
print "Exception!!!", e
time.sleep(120)
Add Timestamps to Background Recorder Asset Titles
Zetta allows you to record audio files in the background and override assets – useful for capturing satellite and codec feeds. However, there’s no quick way for on-air talent to see when an asset was last updated. This script checks all background-recorded files in the system, and appends a timestamp to the filename.
"""
Zetta Background Recorder Timestamper
This script appends the last-modified timestamp string to assets that are updated by the background recorder.
Written by Anthony Eden (https://mediarealm.com.au)
"""
import pypyodbc
import time
import datetime
# Setup Zetta SQL DB Connection:
SQLConnectionString = 'DSN=ZettaDB;'
dbConn = pypyodbc.connect(SQLConnectionString)
cur = dbConn.cursor()
def utc2local(utc):
# From https://stackoverflow.com/a/19238551
epoch = time.mktime(utc.timetuple())
offset = datetime.datetime.fromtimestamp(epoch) - datetime.datetime.utcfromtimestamp(epoch)
return utc + offset
def listBackgroundRecorderAssets():
assetsQuery = '''
SELECT
RecordEvent.AssetID,
Asset.infoModifiedDate,
Asset.Title,
[Resource].infoModifiedDate
FROM
[RecordEvent],
[Asset],
[AssetToResource],
[Resource]
WHERE RefId IS NULL
AND RecordEvent.AssetID IS NOT NULL
AND RecordEvent.AssetID = Asset.AssetID
AND AssetToResource.AssetID = Asset.AssetID
AND AssetToResource.IsPrimaryResource = 1
AND AssetToResource.ResourceID = [Resource].ResourceID
'''
cur.execute(assetsQuery)
assets = []
rows = cur.fetchall()
for d in rows:
localTime = utc2local(d[3])
assets.append({
"AssetID": d[0],
"Title": d[2],
"LastModified": d[3],
"LastModifiedLocal": localTime,
"AppendDateString": "[" + localTime.strftime("%d/%m/%Y %I:%M:%S %p") + "]"
})
return assets
def updateAssetTitle(assetId, title):
# Sets a new title for an asset
assetQuery = ''' UPDATE TOP(1) Asset SET Title = ? WHERE AssetID = ? '''
cur.execute(assetQuery, (title, assetId))
dbConn.commit()
if __name__ == "__main__":
while True:
try:
# Loop over all background recorder assets
for x in listBackgroundRecorderAssets():
# Does this asset already have square braces in the title?
if "[" in x['Title'] and "]" in x['Title'][x['Title'].index("["):]:
# Replace existing square braces with datetime stamp
newTitle = x['Title'][:x['Title'].index("[")] + x['AppendDateString'] + x['Title'][x['Title'].index("]", x['Title'].index("[")) + 1:]
if newTitle != x['Title']:
print "Update title", newTitle, "for Asset ID", x['AssetID']
updateAssetTitle(x['AssetID'], newTitle)
else:
# Append the datetime stamp to the existing title
newTitle = x['Title'] + " " + x['AppendDateString']
print "New title", newTitle, "for Asset ID", x['AssetID']
updateAssetTitle(x['AssetID'], newTitle)
time.sleep(20)
except Exception, e:
print "Exception!!!", e
time.sleep(120)
Batch Export Assets Based On External ID
Let’s say you need to export a lot of assets from Zetta, and you have a list of their External IDs (perhaps you got this list from an external music scheduling program, or a reconciliation file). How can you export all these files to a folder without locating the assets manually in the Library?
Simply paste your list of External IDs into the CONFIG_AssetList variable in this script, specify the source and destination directories, and run it.
"""
Zetta Batch Audio Exporter
Given a list of External IDs, find each asset's file in the Zetta Content Store, and copy it to a separate folder.
Written by Anthony Eden (https://mediarealm.com.au)
"""
from shutil import copyfile
import os
import pypyodbc
import re
# The Zetta Content Store location:
CONFIG_StorageLocation = "\\\\zettaaudio1\\ZAUDIO\\"
# The output location:
CONFIG_OutputStorage = "C:\\SCRIPTS\\ZettaBatchExport\\"
# Paste a list of External IDs here (one per line)
CONFIG_AssetList = """
12345
67890
"""
# Setup Zetta SQL DB Connection:
SQLConnectionString = 'DSN=ZettaDB;'
dbConn = pypyodbc.connect(SQLConnectionString)
cur = dbConn.cursor()
def findAssetByExternalID(ExternalID):
assetQuery = '''
SELECT
Asset.ThirdPartyID,
Asset.Title,
Resource.StorageFile,
AutoGen_AssetReadonlyReferences.ArtistID_1,
Resource.ResourceID
FROM
Asset,
Resource,
AssetToResource,
AutoGen_AssetReadonlyReferences
WHERE
Asset.AssetID = AssetToResource.AssetID
AND AssetToResource.isPrimaryResource = 1
AND AssetToResource.ResourceID = Resource.ResourceID
AND Asset.AssetID = AutoGen_AssetReadonlyReferences.AssetID
AND ThirdPartyID = ?
'''
cur.execute(assetQuery, [str(ExternalID)])
assetList = []
for d in cur:
assetList.append({
"ExternalID": str(d[0]),
"Filename": d[2],
"Title": str(d[1]),
"Artist": getArtistName(d[3]),
"FileFolder": getStorageFolderFromFilename(d[4]),
"FileExtension": getExtensionFromFilename(d[2]),
"ResourceID": str(d[4])
})
return assetList
def getArtistName(artistId):
# Take the Artist ID and return the Artist's name
try:
artistId = int(artistId)
except:
return None
artistNameQuery = '''
SELECT Artist.Name
FROM Artist
WHERE ArtistID = ?'''
cur.execute(artistNameQuery, [artistId])
for d in cur:
return d[0]
return ""
def getStorageFolderFromFilename(filename):
# Zetta stores 1000 items in each folder (e.g. from 1000 to 1999)
# This is based on the ResourceID
folder = str(filename).split("~")[0][:-3]
if folder == "":
folder = 0
return int(folder)
def getExtensionFromFilename(fullname):
# Returns the file's extension name
filename, file_extension = os.path.splitext(fullname)
return file_extension
if __name__ == "__main__":
IDs = CONFIG_AssetList.split()
for thisExternalID in IDs:
# Loop over every specified External ID
asset = findAssetByExternalID(thisExternalID)[0]
assetTitle = re.sub(r'\W+', ' ', asset['Title']).replace(" ", " ").strip()
assetArtist = re.sub(r'\W+', ' ', asset['Artist']).replace(" ", " ").strip()
srcfile = CONFIG_StorageLocation + str(asset['FileFolder']) + "\\" + asset['Filename']
dstfile = CONFIG_OutputStorage + assetArtist + " - " + assetTitle + asset['FileExtension']
if os.path.isfile(dstfile):
print "DST FILE ALREADY EXISTS: ", dstfile
elif os.path.isfile(srcfile):
# Do the copy
print asset['ExternalID'], "||", dstfile
copyfile(srcfile, dstfile)
else:
print "SRC FILE DOESN'T EXIST: ", srcfile
ZettaView: HTML Status Feed Viewer
Update August 2018 : ZettaView is now available for download. Check it out on GitHub .
ZettaView is a simple Zetta Status Feed client , which allows you to view the current and upcoming assets for a specific station in a web browser. This can be useful for outside broadcasts, TV displays around the building, or simple remote troubleshooting.
ZettaView is a Python script, with an in-built Flask web server. For convenience, I’ve bundled this script as an EXE.
Simply download the EXE , rename ‘config-sample.json’ to ‘config.json’, put you Zetta Status Feed Server’s address into the config file, run the EXE, and then browse to http://127.0.0.1:4444.
Please note – this is a simple script, which shouldn’t be exposed to the public internet. Please do not port forward to this script.