目前,我正在使用基本 API 获取从云安全供应商获取的日志,但这不是很干净的过程。我想转换到他们提供的事件流 (AMQPS) 服务。
我已经设置了队列,并在云端拥有相关信息,但我遇到的问题是找出接受该流并将其转储到 syslog 的最佳方法。我现在正在使用 syslog-ng,但它似乎只能发送到 RabbitMQ 而不能接受它。
我目前正在研究 Pika,看看它是否可以接受 AMQPS,然后将其汇集到日志文件,但这就是我遇到一些问题的地方。任何帮助都将不胜感激。
谢谢
答案1
我找到了 walbit 制作的 github 页面(https://github.com/walbit/AMP_API),这对我来说似乎工作正常。我使用的是“consume_event_stream_by_name.py”,它会将所有输入转储到标准输出。我做了一些修改,将其也发送到 syslog,现在 syslog-ng 正在接受它。
我目前对此代码的唯一问题是,如果连接没有接入,它似乎会不时超时或完全死机。所以我需要弄清楚发生了什么。
#!/usr/bin/env python
import argparse
import pika
import pprint
import requests
import sys
import logging
import logging.handlers
# YOU NEED TO CREATE AN auth.py FILE WITH CLIENT_ID AND API_KEY STRINGS
from auth import CLIENT_ID, API_KEY
parser = argparse.ArgumentParser()
parser.add_argument('event_stream_name', metavar='event_stream_name',
nargs=1, help='event stream name')
parser.parse_args()
event_stream_name = parser.parse_args().event_stream_name[0]
api_endpoint = 'https://api.amp.cisco.com/v1/event_streams'
session = requests.Session()
session.auth = (CLIENT_ID, API_KEY)
event_streams = session.get(api_endpoint).json()['data']
event_stream = {}
for e in event_streams:
if e['name'] is event_stream_name:
event_stream = e
amqp_url = 'amqps://{user_name}:{password}@{host}:{port}'.format(
**e['amqp_credentials'])
queue = e['amqp_credentials']['queue_name']
parameters = pika.URLParameters(amqp_url)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
parameters = pika.URLParameters(amqp_url)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.INFO)
handler = logging.handlers.SysLogHandler(address = '/dev/log')
my_logger.addHandler(handler)
def callback(ch, method, properties, body):
# print(" [x] Received meth:\t%r" % method)
# print(" [x] Received prop:\t%r" % properties)
# print(" [x] Received body:\t%r" % body)
# print(body)
my_logger.info('Fireamp: ' + body)
channel.basic_consume(callback, queue, no_ack=True)
print(" [*] Connecting to:\t%r" % amqp_url)
print(" [*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()