| name | as400-interface |
| description | Interface with IBM AS/400 (iSeries) systems via command execution, file access, and program calls. Use when integrating with AS/400 systems, executing CL commands, or accessing native AS/400 files and programs. |
AS/400 Interface
Interact with IBM AS/400 (iSeries) systems from Python.
Connection Setup
import pyodbc
# ODBC connection to AS/400
conn_str = (
"DRIVER={iSeries Access ODBC Driver};"
"SYSTEM=as400.example.com;"
"UID=username;"
"PWD=password;"
"DBQ=MYLIB;"
)
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
Accessing AS/400 Files
# Query physical file
cursor.execute("SELECT * FROM MYLIB.CUSTFILE")
for row in cursor.fetchall():
print(row)
# Query with WHERE clause
cursor.execute("""
SELECT CUSTNO, CUSTNAM, CUSTBAL
FROM MYLIB.CUSTFILE
WHERE CUSTBAL > ?
""", (1000.00,))
customers = cursor.fetchall()
Calling AS/400 Programs
# Call CL command
def run_cl_command(conn, command):
"""Execute CL command on AS/400."""
cursor = conn.cursor()
try:
cursor.execute(f"CALL QSYS.QCMDEXC('{command}', {len(command):015.5f})")
conn.commit()
except Exception as e:
print(f"Command failed: {e}")
conn.rollback()
# Usage
run_cl_command(conn, "CLRPFM FILE(MYLIB/TEMPFILE)")
Data Queue Operations
# Read from data queue
def read_data_queue(conn, library, queue_name, timeout=5):
"""Read entry from AS/400 data queue."""
sql = f"""
SELECT ENTRY
FROM {library}.{queue_name}
WHERE WAIT_TIME = {timeout}
"""
cursor = conn.cursor()
cursor.execute(sql)
result = cursor.fetchone()
return result[0] if result else None
# Write to data queue
def write_data_queue(conn, library, queue_name, data):
"""Write entry to AS/400 data queue."""
cursor = conn.cursor()
cursor.execute(
f"INSERT INTO {library}.{queue_name} VALUES (?)",
(data,)
)
conn.commit()
IFS File Access
import ftplib
def download_ifs_file(host, user, password, remote_path, local_path):
"""Download file from AS/400 IFS."""
ftp = ftplib.FTP(host)
ftp.login(user, password)
with open(local_path, 'wb') as f:
ftp.retrbinary(f'RETR {remote_path}', f.write)
ftp.quit()
def upload_ifs_file(host, user, password, local_path, remote_path):
"""Upload file to AS/400 IFS."""
ftp = ftplib.FTP(host)
ftp.login(user, password)
with open(local_path, 'rb') as f:
ftp.storbinary(f'STOR {remote_path}', f)
ftp.quit()
Job Management
def submit_job(conn, library, program, parameters=''):
"""Submit batch job to AS/400."""
command = f"SBMJOB CMD(CALL PGM({library}/{program}) PARM({parameters}))"
run_cl_command(conn, command)
def check_job_status(conn, job_name, user, job_number):
"""Check status of AS/400 job."""
cursor = conn.cursor()
cursor.execute("""
SELECT JOB_STATUS, JOB_STATUS_MESSAGE
FROM QSYS2.JOB_INFO
WHERE JOB_NAME = ? AND USER_NAME = ? AND JOB_NUMBER = ?
""", (job_name, user, job_number))
result = cursor.fetchone()
return result if result else None
Spool File Access
def read_spool_file(conn, job_name, user, file_name):
"""Read AS/400 spool file content."""
cursor = conn.cursor()
cursor.execute("""
SELECT SPOOLED_DATA
FROM QSYS2.OUTPUT_QUEUE_ENTRIES
WHERE JOB_NAME = ?
AND USER_NAME = ?
AND SPOOLED_FILE_NAME = ?
""", (job_name, user, file_name))
result = cursor.fetchone()
return result[0] if result else None
Connection Pool
class AS400ConnectionPool:
def __init__(self, conn_str, pool_size=5):
self.conn_str = conn_str
self.pool = []
for _ in range(pool_size):
self.pool.append(pyodbc.connect(conn_str))
def get_connection(self):
return self.pool.pop() if self.pool else pyodbc.connect(self.conn_str)
def return_connection(self, conn):
self.pool.append(conn)
def close_all(self):
for conn in self.pool:
conn.close()
Complete Example
def sync_inventory_from_as400(conn_str):
"""Sync inventory data from AS/400 system."""
conn = pyodbc.connect(conn_str)
cursor = conn.cursor()
try:
# Read inventory file
cursor.execute("""
SELECT ITEMNO, ITMDSC, QTYOH, UNITPR
FROM INVLIB.INVMAST
WHERE QTYOH > 0
""")
inventory = []
for row in cursor.fetchall():
inventory.append({
'item_number': row.ITEMNO.strip(),
'description': row.ITMDSC.strip(),
'quantity': int(row.QTYOH),
'price': float(row.UNITPR)
})
return inventory
finally:
cursor.close()
conn.close()