Claude Code Plugins

Community-maintained marketplace

Feedback
17
0

Interface with IBM AS/400 (iSeries) systems via command execution, file access, and program calls. Use when integrating with AS/400 systems, executing CL commands, or accessing native AS/400 files and programs.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name as400-interface
description Interface with IBM AS/400 (iSeries) systems via command execution, file access, and program calls. Use when integrating with AS/400 systems, executing CL commands, or accessing native AS/400 files and programs.

AS/400 Interface

Interact with IBM AS/400 (iSeries) systems from Python.

Connection Setup

import pyodbc

# ODBC connection to AS/400
conn_str = (
    "DRIVER={iSeries Access ODBC Driver};"
    "SYSTEM=as400.example.com;"
    "UID=username;"
    "PWD=password;"
    "DBQ=MYLIB;"
)

conn = pyodbc.connect(conn_str)
cursor = conn.cursor()

Accessing AS/400 Files

# Query physical file
cursor.execute("SELECT * FROM MYLIB.CUSTFILE")
for row in cursor.fetchall():
    print(row)

# Query with WHERE clause
cursor.execute("""
    SELECT CUSTNO, CUSTNAM, CUSTBAL
    FROM MYLIB.CUSTFILE
    WHERE CUSTBAL > ?
""", (1000.00,))

customers = cursor.fetchall()

Calling AS/400 Programs

# Call CL command
def run_cl_command(conn, command):
    """Execute CL command on AS/400."""
    cursor = conn.cursor()
    try:
        cursor.execute(f"CALL QSYS.QCMDEXC('{command}', {len(command):015.5f})")
        conn.commit()
    except Exception as e:
        print(f"Command failed: {e}")
        conn.rollback()

# Usage
run_cl_command(conn, "CLRPFM FILE(MYLIB/TEMPFILE)")

Data Queue Operations

# Read from data queue
def read_data_queue(conn, library, queue_name, timeout=5):
    """Read entry from AS/400 data queue."""
    sql = f"""
        SELECT ENTRY
        FROM {library}.{queue_name}
        WHERE WAIT_TIME = {timeout}
    """
    cursor = conn.cursor()
    cursor.execute(sql)
    result = cursor.fetchone()
    return result[0] if result else None

# Write to data queue
def write_data_queue(conn, library, queue_name, data):
    """Write entry to AS/400 data queue."""
    cursor = conn.cursor()
    cursor.execute(
        f"INSERT INTO {library}.{queue_name} VALUES (?)",
        (data,)
    )
    conn.commit()

IFS File Access

import ftplib

def download_ifs_file(host, user, password, remote_path, local_path):
    """Download file from AS/400 IFS."""
    ftp = ftplib.FTP(host)
    ftp.login(user, password)

    with open(local_path, 'wb') as f:
        ftp.retrbinary(f'RETR {remote_path}', f.write)

    ftp.quit()

def upload_ifs_file(host, user, password, local_path, remote_path):
    """Upload file to AS/400 IFS."""
    ftp = ftplib.FTP(host)
    ftp.login(user, password)

    with open(local_path, 'rb') as f:
        ftp.storbinary(f'STOR {remote_path}', f)

    ftp.quit()

Job Management

def submit_job(conn, library, program, parameters=''):
    """Submit batch job to AS/400."""
    command = f"SBMJOB CMD(CALL PGM({library}/{program}) PARM({parameters}))"
    run_cl_command(conn, command)

def check_job_status(conn, job_name, user, job_number):
    """Check status of AS/400 job."""
    cursor = conn.cursor()
    cursor.execute("""
        SELECT JOB_STATUS, JOB_STATUS_MESSAGE
        FROM QSYS2.JOB_INFO
        WHERE JOB_NAME = ? AND USER_NAME = ? AND JOB_NUMBER = ?
    """, (job_name, user, job_number))

    result = cursor.fetchone()
    return result if result else None

Spool File Access

def read_spool_file(conn, job_name, user, file_name):
    """Read AS/400 spool file content."""
    cursor = conn.cursor()
    cursor.execute("""
        SELECT SPOOLED_DATA
        FROM QSYS2.OUTPUT_QUEUE_ENTRIES
        WHERE JOB_NAME = ?
          AND USER_NAME = ?
          AND SPOOLED_FILE_NAME = ?
    """, (job_name, user, file_name))

    result = cursor.fetchone()
    return result[0] if result else None

Connection Pool

class AS400ConnectionPool:
    def __init__(self, conn_str, pool_size=5):
        self.conn_str = conn_str
        self.pool = []
        for _ in range(pool_size):
            self.pool.append(pyodbc.connect(conn_str))

    def get_connection(self):
        return self.pool.pop() if self.pool else pyodbc.connect(self.conn_str)

    def return_connection(self, conn):
        self.pool.append(conn)

    def close_all(self):
        for conn in self.pool:
            conn.close()

Complete Example

def sync_inventory_from_as400(conn_str):
    """Sync inventory data from AS/400 system."""
    conn = pyodbc.connect(conn_str)
    cursor = conn.cursor()

    try:
        # Read inventory file
        cursor.execute("""
            SELECT ITEMNO, ITMDSC, QTYOH, UNITPR
            FROM INVLIB.INVMAST
            WHERE QTYOH > 0
        """)

        inventory = []
        for row in cursor.fetchall():
            inventory.append({
                'item_number': row.ITEMNO.strip(),
                'description': row.ITMDSC.strip(),
                'quantity': int(row.QTYOH),
                'price': float(row.UNITPR)
            })

        return inventory

    finally:
        cursor.close()
        conn.close()